')
+
+ # Process paragraphs (sections)
+ for para_idx, paragraph in enumerate(page_data):
+ hocr_content.append(f'
')
+
+ # Process lines
+ for line_idx, line in enumerate(paragraph):
+ if not line:
+ continue
+
+ # Calculate line bounding box
+ line_boxes = [word["box"] for word in line if "box" in word]
+ if line_boxes:
+ x0 = min(box[0] for box in line_boxes)
+ y0 = min(box[1] for box in line_boxes)
+ x1 = max(box[2] for box in line_boxes)
+ y1 = max(box[3] for box in line_boxes)
+
+ hocr_content.append(f'')
+
+ # Process words
+ for word_idx, word in enumerate(line):
+ if "box" in word and "text" in word:
+ box = word["box"]
+ text = word["text"].replace("&", "&").replace("<", "<").replace(">", ">")
+ conf = word.get("conf", 95)
+ hocr_content.append(f'{text}')
+
+ hocr_content.append('')
+
+ hocr_content.append('
')
+
+ hocr_content.append('
')
+
+ hocr_content.append('')
+ hocr_content.append('')
+
+ # Write to file
+ with open(target, "w", encoding="utf-8") as f:
+ f.write("\n".join(hocr_content))
+
+ # Update metadata
+ data_update = {
+ "hocr": {
+ "complete": True,
+ "size": size_to_units(get_file_size(target, path_complete=True)),
+ "creation": get_current_time(),
+ }
+ }
+ update_json_file(data_file, data_update)
+
+ return target
+
+
def export_alto(path):
with open(path, encoding="utf-8") as f:
hocrfile = json.load(f)
diff --git a/server/src/utils/file.py b/server/src/utils/file.py
index 21eb1d99..de99bfdf 100644
--- a/server/src/utils/file.py
+++ b/server/src/utils/file.py
@@ -14,8 +14,10 @@
# from string import punctuation
FILES_PATH = environ.get("FILES_PATH", "_files")
+INPUTS_PATH = environ.get("INPUTS_PATH", "_inputs")
+OUTPUTS_PATH = environ.get("OUTPUTS_PATH", "_outputs")
TEMP_PATH = environ.get("TEMP_PATH", "_pending-files")
-PRIVATE_PATH = environ.get("PRIVATE_PATH", "_files/_private_spaces")
+PRIVATE_PATH = environ.get("PRIVATE_PATH", "_private_spaces")
API_TEMP_PATH = environ.get("API_TEMP_PATH", "_files/_tmp")
ALLOWED_EXTENSIONS = (
@@ -43,19 +45,112 @@
# FILESYSTEM UTILS
##################################################
-# Current file system structure
-# files
-# - folder1
-# - filename.(pdf/png/jpg/...)
-# - filename.(pdf/png/jpg/...) (the original submitted file)
-# - filename_extracted.txt (the text extracted initially)
-# - filename_changes.txt (the text changed by the user)
-# - conf.txt (the conf file of the OCR engine used)
-# - folder2
+# File system structure (three separate trees with mirrored structure):
+#
+# _inputs/ (original files - displayed in UI)
+# - folder1/
+# - subfolder/
+# - filename.pdf (the original submitted file)
+#
+# _files/ (metadata and processing data)
+# - folder1/
+# - _data.json (folder metadata)
+# - subfolder/
+# - _data.json (folder metadata)
+# - filename.pdf/ (document folder)
+# - _data.json (document metadata)
+# - _ocr_results/ (OCR JSON results per page)
+# - _pages/ (extracted pages as images)
+# - _layouts/ (layout definitions)
+# - _thumbnails/ (document thumbnails)
+# - _images/ (extracted images from layouts)
+#
+# _outputs/ (exported results)
+# - folder1/
+# - subfolder/
+# - filename.pdf/
+# - _txt.txt
+# - _pdf.pdf
+# - _pdf_indexed.pdf
+# - _index.csv
+# - _entities.json
+# - _images.zip
-def get_ner_file(path):
- with open(f"{path}/_export/_txt.txt", "rb") as file:
+def get_relative_path(full_path, is_private=False, private_space=None):
+ """
+ Extract the relative path from a full path by removing the base directory prefix.
+
+ :param full_path: the full path (e.g., '_inputs/folder/file.pdf')
+ :param is_private: whether the path is in a private space
+ :param private_space: the private space ID if applicable
+ :return: the relative path (e.g., 'folder/file.pdf')
+ """
+ if is_private and private_space:
+ prefix = f"{PRIVATE_PATH}/{private_space}"
+ if full_path.startswith(prefix):
+ return full_path[len(prefix):].strip("/")
+ for base in [INPUTS_PATH, FILES_PATH, OUTPUTS_PATH]:
+ if full_path.startswith(base):
+ return full_path[len(base):].strip("/")
+ return full_path.strip("/")
+
+
+def get_inputs_path(relative_path, is_private=False, private_space=None):
+ """
+ Get the full path in _inputs for a relative path.
+
+ :param relative_path: the relative path within the file structure
+ :param is_private: whether the path is in a private space
+ :param private_space: the private space ID if applicable
+ :return: full path in the inputs directory
+ """
+ if is_private and private_space:
+ return f"{PRIVATE_PATH}/{private_space}/_inputs/{relative_path}".rstrip("/")
+ return f"{INPUTS_PATH}/{relative_path}".rstrip("/")
+
+
+def get_files_path(relative_path, is_private=False, private_space=None):
+ """
+ Get the full path in _files for a relative path.
+
+ :param relative_path: the relative path within the file structure
+ :param is_private: whether the path is in a private space
+ :param private_space: the private space ID if applicable
+ :return: full path in the files directory
+ """
+ if is_private and private_space:
+ return f"{PRIVATE_PATH}/{private_space}/_files/{relative_path}".rstrip("/")
+ return f"{FILES_PATH}/{relative_path}".rstrip("/")
+
+
+def get_outputs_path(relative_path, is_private=False, private_space=None):
+ """
+ Get the full path in _outputs for a relative path.
+
+ :param relative_path: the relative path within the file structure
+ :param is_private: whether the path is in a private space
+ :param private_space: the private space ID if applicable
+ :return: full path in the outputs directory
+ """
+ if is_private and private_space:
+ return f"{PRIVATE_PATH}/{private_space}/_outputs/{relative_path}".rstrip("/")
+ return f"{OUTPUTS_PATH}/{relative_path}".rstrip("/")
+
+
+def get_ner_file(files_path, outputs_path):
+ """
+ Request NER entities from the text file and save to outputs.
+
+ :param files_path: path to document folder in _files (for reading _data.json if needed)
+ :param outputs_path: path to document folder in _outputs (for reading txt and writing entities)
+ :return: True if successful, False otherwise
+ """
+ txt_file_path = f"{outputs_path}/_txt.txt"
+ if not os.path.exists(txt_file_path):
+ return False
+
+ with open(txt_file_path, "rb") as file:
r = requests.post(
"https://iris.sysresearch.org/anonimizador/from-text",
files={"file": file},
@@ -66,7 +161,7 @@ def get_ner_file(path):
return False
if r.status_code == 200:
- with open(f"{path}/_export/_entities.json", "w", encoding="utf-8") as f:
+ with open(f"{outputs_path}/_entities.json", "w", encoding="utf-8") as f:
json.dump(ner, f, indent=2, ensure_ascii=False)
return True
else:
@@ -261,18 +356,27 @@ def delete_structure(client, path):
delete_structure(client, folder)
-# TODO
def get_filesystem(path, private_space: str = None, is_private: bool = False) -> dict:
"""
- :param path: path to the folder
+ Get the filesystem structure starting from INPUTS_PATH.
+
+ :param path: path to the folder (relative or in _inputs)
:param private_space: name of the private space, if applicable
:param is_private: whether the target path is a private space
"""
- files = get_structure(path, private_space, is_private)
- info = get_structure_info(path, private_space, is_private)
+ # Determine the inputs path for structure and files path for metadata
+ if is_private and private_space:
+ inputs_base = f"{PRIVATE_PATH}/{private_space}/_inputs"
+ files_base = f"{PRIVATE_PATH}/{private_space}/_files"
+ else:
+ inputs_base = INPUTS_PATH
+ files_base = FILES_PATH
+
+ files = get_structure(inputs_base, files_base, private_space, is_private)
+ info = get_structure_info(inputs_base, files_base, private_space, is_private)
if files is None:
- if path != FILES_PATH and PRIVATE_PATH not in path:
+ if path != INPUTS_PATH and PRIVATE_PATH not in path:
files = {path: []}
else:
files = {"files": []}
@@ -323,24 +427,45 @@ def get_ocr_size(path):
return f"{size / 1024 ** 3:.2f} GB"
-def get_document_files_size(path, extension=None, from_api: bool = False):
+def get_document_files_size(files_path, inputs_path=None, outputs_path=None, extension=None, from_api: bool = False):
"""
- Get the total size of files related to a document,
- which are the original copy of the file and result files inside /_export.
- :param path: path to the document folder
- :param extension: extension in the original file, used in the case of documents from the API
+ Get the total size of files related to a document across all three folders.
+
+ :param files_path: path to document folder in _files (metadata/processing)
+ :param inputs_path: path to original file in _inputs (optional, calculated if not provided)
+ :param outputs_path: path to document folder in _outputs (optional, calculated if not provided)
+ :param extension: extension of the original file, used for API documents
:param from_api: whether the method is being called for a file from the API
:return: total size in bytes
"""
- original_path = (
- f"{path}/{get_file_basename(path)}.{extension}" if from_api else path
- )
- size = get_file_size(original_path, path_complete=from_api) # original file's size
- for dirpath, folders, filenames in os.walk(f"{path}/_export"):
- for f in filenames:
- subpath = os.path.join(dirpath, f)
- if not os.path.islink(subpath):
- size += os.path.getsize(subpath)
+ size = 0
+
+ # Size of original file in _inputs
+ if inputs_path and os.path.exists(inputs_path):
+ if os.path.isfile(inputs_path):
+ size += os.path.getsize(inputs_path)
+ elif from_api:
+ # API files have the original inside the _files path
+ original_path = f"{files_path}/{get_file_basename(files_path)}.{extension}"
+ if os.path.exists(original_path):
+ size += os.path.getsize(original_path)
+
+ # Size of metadata/processing files in _files
+ if os.path.exists(files_path):
+ for dirpath, folders, filenames in os.walk(files_path):
+ for f in filenames:
+ subpath = os.path.join(dirpath, f)
+ if not os.path.islink(subpath):
+ size += os.path.getsize(subpath)
+
+ # Size of output files in _outputs
+ if outputs_path and os.path.exists(outputs_path):
+ for dirpath, folders, filenames in os.walk(outputs_path):
+ for f in filenames:
+ subpath = os.path.join(dirpath, f)
+ if not os.path.islink(subpath):
+ size += os.path.getsize(subpath)
+
return size
@@ -359,28 +484,35 @@ def get_folder_size(path):
return size
-def get_file_size(path, path_complete=False):
+def get_file_size(path, path_complete=True):
"""
Returns the file's size.
+
:param path: path to the file
- :param path_complete: whether the path is complete;
- if not, seeks the file contained within the target folder which shares its name
+ :param path_complete: whether the path points directly to a file;
+ if False, assumes path is a folder and looks for a file with the folder's name inside it
:return: file size in bytes
"""
if not path_complete:
name = path.split("/")[-1]
path = f"{path}/{name}"
+ if not os.path.exists(path):
+ return 0
return os.path.getsize(path)
-def get_folder_info(path, private_space=None):
+def get_folder_info(inputs_path, files_path, private_space=None, is_private=False):
"""
- Get the info of the folder
- :param path: path to the folder
+ Get the info of the folder.
+
+ :param inputs_path: path to the folder in _inputs (for listing contents)
+ :param files_path: path to the folder in _files (for metadata)
+ :param private_space: name of the private space if applicable
+ :param is_private: whether this is a private space
"""
info = {}
try:
- data = get_data(f"{path}/_data.json")
+ data = get_data(f"{files_path}/_data.json")
except (FileNotFoundError, JSONDecodeError):
return {}
@@ -390,55 +522,123 @@ def get_folder_info(path, private_space=None):
if data["type"] == "folder":
n_subfolders = 0
n_docs = 0
- for content in os.scandir(path):
- if content.is_dir() and not content.name.startswith("_"):
- content_data = get_data(f"{path}/{content.name}/_data.json")
- if "type" in content_data:
- if content_data["type"] == "folder":
- n_subfolders += 1
- elif content_data["type"] == "file":
- n_docs += 1
+ # Scan contents from _inputs path
+ if os.path.exists(inputs_path):
+ for content in os.scandir(inputs_path):
+ if content.is_dir() and not content.name.startswith("_"):
+ # Check metadata in _files path
+ content_files_path = f"{files_path}/{content.name}"
+ try:
+ content_data = get_data(f"{content_files_path}/_data.json")
+ if "type" in content_data:
+ if content_data["type"] == "folder":
+ n_subfolders += 1
+ elif content_data["type"] == "file":
+ n_docs += 1
+ except (FileNotFoundError, JSONDecodeError):
+ # Check if it's a file (file in inputs, folder in files)
+ if content.is_file():
+ n_docs += 1
+ else:
+ n_subfolders += 1
+ elif content.is_file() and not content.name.startswith("_"):
+ # This is a document (file in _inputs)
+ n_docs += 1
data["contents"] = {"documents": n_docs, "subfolders": n_subfolders}
+ # Calculate folder size from _files path (metadata/processing data)
folder_size = 0
- dirs_dict = {}
- # traverse bottom-up adding subdirectory sizes
- for root, dirs, files in os.walk(path, topdown=False):
- # sum directory file sizes
- size = sum(os.path.getsize(os.path.join(root, name)) for name in files)
- # sum subdirectory sizes
- subdir_size = sum(dirs_dict[os.path.join(root, d)] for d in dirs)
- # store size of current directory and update total size
- folder_size = dirs_dict[root] = size + subdir_size
+ if os.path.exists(files_path):
+ dirs_dict = {}
+ for root, dirs, files in os.walk(files_path, topdown=False):
+ size = sum(os.path.getsize(os.path.join(root, name)) for name in files)
+ subdir_size = sum(dirs_dict.get(os.path.join(root, d), 0) for d in dirs)
+ folder_size = dirs_dict[root] = size + subdir_size
data["size"] = size_to_units(folder_size)
- # sanitize important paths from the info key
- path = (
- path.replace(f"{PRIVATE_PATH}/{private_space}", "")
- .replace(PRIVATE_PATH, "")
- .replace(FILES_PATH, "")
- .strip("/")
- )
- info[path] = data
+ # Check if folder is in queue (active, queued, or finished)
+ if data.get("type") == "folder":
+ try:
+ from src.utils.system_settings import get_system_settings
+ import time
+
+ settings = get_system_settings()
+ active_folders = settings.get("active_folders", [])
+ queued_folders = settings.get("queued_folders", [])
+ finished_folders = settings.get("finished_folders", [])
+
+ # Check if this folder is active
+ for folder in active_folders:
+ if folder.get("path") == files_path:
+ started_at = folder.get("started_at", 0)
+ duration = int(time.time() - started_at) if started_at else 0
+ data["queue_status"] = {
+ "state": "active",
+ "folder_id": folder.get("id"),
+ "duration_seconds": duration
+ }
+ break
+ else:
+ # Check if this folder is queued
+ for idx, folder in enumerate(queued_folders, 1):
+ if folder.get("path") == files_path:
+ queued_at = folder.get("queued_at", 0)
+ wait_time = int(time.time() - queued_at) if queued_at else 0
+ data["queue_status"] = {
+ "state": "queued",
+ "folder_id": folder.get("id"),
+ "position": idx,
+ "wait_time_seconds": wait_time
+ }
+ break
+ else:
+ # Check if this folder recently finished
+ for folder in finished_folders:
+ if folder.get("path") == files_path:
+ data["queue_status"] = {
+ "state": "finished",
+ "folder_id": folder.get("id"),
+ "completed_at": int(folder.get("completed_at", 0))
+ }
+ break
+ except Exception as e:
+ # If there's any error checking queue status, just skip it
+ import logging as log
+ log.debug(f"Could not check folder queue status: {e}")
+
+ # Sanitize important paths from the info key to get relative path
+ if is_private and private_space:
+ relative_path = files_path.replace(f"{PRIVATE_PATH}/{private_space}/_files", "").strip("/")
+ else:
+ relative_path = files_path.replace(FILES_PATH, "").strip("/")
+
+ info[relative_path] = data
return info
-def get_structure_info(path, private_space=None, is_private=False):
+def get_structure_info(inputs_base, files_base, private_space=None, is_private=False):
"""
- Get the info of each file/folder
+ Get the info of each file/folder by walking _inputs and reading metadata from _files.
+
+ :param inputs_base: base path in _inputs to walk
+ :param files_base: base path in _files for metadata
+ :param private_space: name of private space if applicable
+ :param is_private: whether this is a private space
"""
- if not is_private and PRIVATE_PATH in path:
+ if not is_private and PRIVATE_PATH in inputs_base:
raise FileNotFoundError
- if API_TEMP_PATH in path:
+ if API_TEMP_PATH in inputs_base:
raise FileNotFoundError
info = {}
- for root, folders, _ in os.walk(path, topdown=True):
+ # Walk the _inputs tree
+ for root, folders, files in os.walk(inputs_base, topdown=True):
root = root.replace("\\", "/")
- # ignore reserved folders by pruning them from search tree
- folders[:] = [f for f in folders if not f.startswith("_")]
- if root.split("/")[-1].startswith("_"):
+ # ignore reserved and hidden folders by pruning them from search tree
+ folders[:] = [f for f in folders if not f.startswith("_") and not f.startswith(".")]
+ # Don't skip the base folder itself (e.g., _inputs at root of private space)
+ if root != inputs_base and (root.split("/")[-1].startswith("_") or root.split("/")[-1].startswith(".")):
continue
# ignore possible private path folders
if not is_private and (PRIVATE_PATH in root or root in PRIVATE_PATH.split("/")):
@@ -447,15 +647,32 @@ def get_structure_info(path, private_space=None, is_private=False):
if is_private and f"{PRIVATE_PATH}/{private_space}" not in root:
continue
- folder_path = root.replace("\\", "/")
- folder_info = get_folder_info(folder_path, private_space)
+ # Calculate the relative path from inputs_base
+ relative_path = root.replace(inputs_base, "").strip("/")
+ files_path = f"{files_base}/{relative_path}".rstrip("/")
+
+ # Get folder info using both inputs and files paths
+ folder_info = get_folder_info(root, files_path, private_space, is_private)
info = {**info, **folder_info}
+
+ # Also get info for files (documents) in this folder
+ for filename in files:
+ if filename.startswith("_") or filename.startswith("."):
+ continue
+ # For documents, the file is in _inputs, metadata folder is in _files
+ doc_inputs_path = f"{root}/{filename}"
+ doc_files_path = f"{files_path}/{filename}"
+ doc_info = get_folder_info(doc_inputs_path, doc_files_path, private_space, is_private)
+ info = {**info, **doc_info}
+
return info
-def get_structure(path, private_space=None, is_private=False):
+def get_structure(inputs_path, files_path, private_space=None, is_private=False):
"""
- Put the file system structure in a dict
+ Build the file system structure from _inputs tree with metadata from _files.
+
+ Returns a dict like:
{
'files': [
{
@@ -467,51 +684,82 @@ def get_structure(path, private_space=None, is_private=False):
]
}
- :param path: the path to the files
+ :param inputs_path: path in _inputs to read structure from
+ :param files_path: corresponding path in _files for metadata
+ :param private_space: name of private space if applicable
+ :param is_private: whether this is a private space
"""
- if not is_private and PRIVATE_PATH in path:
+ if not is_private and PRIVATE_PATH in inputs_path:
raise FileNotFoundError
- if API_TEMP_PATH in path:
+ if API_TEMP_PATH in inputs_path:
raise FileNotFoundError
filesystem = {}
- if path == FILES_PATH or path == f"{PRIVATE_PATH}/{private_space}":
+
+ # Determine if this is a root folder
+ if is_private and private_space:
+ is_root = inputs_path == f"{PRIVATE_PATH}/{private_space}/_inputs"
+ else:
+ is_root = inputs_path == INPUTS_PATH
+
+ if is_root:
name = "files"
else:
- name = path.split("/")[-1]
+ name = inputs_path.split("/")[-1]
+ # Check if this is a document (file in _inputs, folder in _files)
+ if os.path.isfile(inputs_path):
+ # This is a document file
+ return name
+
+ # Check metadata in _files for folders
try:
- data = get_data(f"{path}/_data.json")
+ data = get_data(f"{files_path}/_data.json")
+ if "type" not in data:
+ return None
+ if data["type"] == "file":
+ return name
except (FileNotFoundError, JSONDecodeError):
- return None
+ # No metadata yet, treat as regular folder
+ pass
- if "type" not in data:
- return None
- if data["type"] == "file":
- return name
+ if not os.path.exists(inputs_path):
+ return None
contents = []
- # ignore reserved folders that start with '_'
- folders = sorted(
- [
- f
- for f in os.listdir(path)
- if os.path.isdir(f"{path}/{f}") and not f.startswith("_")
- ]
- )
- for folder in folders:
+
+ # List all items in inputs_path (both files and folders)
+ # Filter out hidden files (starting with "." or "_")
+ items = sorted([
+ f for f in os.listdir(inputs_path)
+ if not f.startswith("_") and not f.startswith(".")
+ ])
+
+ for item in items:
+ item_inputs_path = f"{inputs_path}/{item}"
+ item_files_path = f"{files_path}/{item}"
+
# ignore possible private path folders
- if not is_private and folder in PRIVATE_PATH.split("/"):
+ if not is_private and item in PRIVATE_PATH.split("/"):
continue
- # if in a private space, ignore folders not from this private space
- if is_private and f"{PRIVATE_PATH}/{private_space}" not in f"{path}/{folder}":
+ # if in a private space, ignore items not from this private space
+ if is_private and f"{PRIVATE_PATH}/{private_space}" not in item_inputs_path:
continue
- folder = f"{path}/{folder}"
- file = get_structure(folder, private_space, is_private)
+ # Only include items that have metadata in _files (have been synced)
+ item_data_path = f"{item_files_path}/_data.json"
+ if not os.path.exists(item_data_path):
+ # Item not synced yet - don't show in file list
+ continue
- if file is not None:
- contents.append(file)
+ if os.path.isfile(item_inputs_path):
+ # This is a document file - just add the filename
+ contents.append(item)
+ elif os.path.isdir(item_inputs_path):
+ # This is a folder - recurse
+ result = get_structure(item_inputs_path, item_files_path, private_space, is_private)
+ if result is not None:
+ contents.append(result)
filesystem[name] = contents
return filesystem
@@ -627,6 +875,52 @@ def get_doc_len(file) -> int:
return int(json.loads(text)["pages"])
+def get_inherited_config(files_path, is_private=False):
+ """
+ Get OCR configuration for a file/folder, checking parent folders if needed.
+ Walks up the folder hierarchy to find the first available configuration.
+
+ :param files_path: path to the file/folder in _files
+ :param is_private: whether this is in a private space
+ :return: configuration dict or None
+ """
+ # Determine the root path to stop at
+ if is_private:
+ # For private spaces, stop at the private space root
+ # Path format: _files/_private_spaces/{space_id}/...
+ root_marker = f"{FILES_PATH}/{PRIVATE_PATH}"
+ else:
+ # For public files, stop at _files root
+ root_marker = FILES_PATH
+
+ current_path = files_path
+
+ # Walk up the folder hierarchy
+ while current_path and current_path.startswith(root_marker):
+ data_file = f"{current_path}/_data.json"
+
+ try:
+ data = get_data(data_file)
+ if "config" in data and data["config"] != "default":
+ # Found a config, return it
+ return data["config"]
+ except (FileNotFoundError, JSONDecodeError):
+ # No data file or invalid JSON, continue up
+ pass
+
+ # Move to parent folder
+ parent = os.path.dirname(current_path)
+
+ # Stop if we've reached the root or can't go higher
+ if parent == current_path or parent == root_marker or not parent.startswith(root_marker):
+ break
+
+ current_path = parent
+
+ # No config found in hierarchy
+ return None
+
+
def update_json_file(file, data, lock=None):
"""
Update the JSON data contained in the file.
diff --git a/server/src/utils/image_compression.py b/server/src/utils/image_compression.py
new file mode 100644
index 00000000..5e283eba
--- /dev/null
+++ b/server/src/utils/image_compression.py
@@ -0,0 +1,614 @@
+import gc
+import os
+import io
+from typing import Optional, Tuple, Any, Dict
+
+import numpy as np
+from PIL import Image, ImageFilter, ImageSequence
+import fitz # PyMuPDF
+
+# OpenCV is now required because default mask method is CV
+import cv2
+
+
+# -------------------------------
+# 1. MRC segmentation – PIL-based (kept as optional fallback)
+# -------------------------------
+
+def segment_page_to_mrc_components_pil(pil_img: Image.Image) -> Tuple[Image.Image, Image.Image, Image.Image]:
+ """
+ PIL-based segmentation.
+ Given a PIL Image page (assumed RGB or similar), return:
+ - bg_img: smooth colour background (RGB)
+ - fg_img: colour foreground-only (text / edges) on white (RGB)
+ - mask_img: binary mask (L, 0=background, 255=foreground)
+ """
+ orig_rgb = pil_img.convert("RGB")
+ gray = orig_rgb.convert("L")
+
+ arr = np.asarray(gray, dtype=np.float32)
+ p2, p98 = np.percentile(arr, (2, 98))
+ if p98 > p2:
+ arr = (arr - p2) * (255.0 / (p98 - p2))
+ arr = np.clip(arr, 0, 255)
+ gray = Image.fromarray(arr.astype(np.uint8), mode="L")
+
+ bg_smooth_gray = gray.filter(ImageFilter.GaussianBlur(radius=5))
+
+ arr_gray = np.asarray(gray, dtype=np.float32)
+ arr_bg = np.asarray(bg_smooth_gray, dtype=np.float32)
+ diff = np.abs(arr_gray - arr_bg)
+
+ mean_diff = diff.mean()
+ std_diff = diff.std()
+ thr = max(20.0, mean_diff + 1.5 * std_diff)
+ raw_mask = diff > thr
+
+ mask_pil = Image.fromarray((raw_mask.astype(np.uint8) * 255), mode="L")
+ mask_pil = mask_pil.filter(ImageFilter.MedianFilter(size=3))
+ mask_arr = np.asarray(mask_pil, dtype=np.uint8)
+ mask_final = mask_arr > 127
+
+ mask_img_arr = np.where(mask_final, 255, 0).astype(np.uint8)
+ mask_img = Image.fromarray(mask_img_arr, mode="L")
+
+ bg_img = orig_rgb.filter(ImageFilter.GaussianBlur(radius=5))
+
+ rgb_arr = np.asarray(orig_rgb, dtype=np.uint8).copy()
+ rgb_arr[~mask_final] = 255
+ fg_img = Image.fromarray(rgb_arr, mode="RGB")
+
+ return bg_img, fg_img, mask_img
+
+def make_inpainted_background_cv(
+ pil_img: Image.Image,
+ mask_img: Image.Image,
+ inpaint_method: str = "telea", # "telea" | "ns" | "masked_blur"
+ inpaint_radius: int = 3,
+ dilate_px: int = 1,
+ post_blur_radius: float = 1.5,
+) -> Image.Image:
+ img_rgb = np.asarray(pil_img.convert("RGB"), dtype=np.uint8)
+ mask = np.asarray(mask_img.convert("L"), dtype=np.uint8)
+ mask_bin = (mask > 127).astype(np.uint8) * 255
+
+ if dilate_px > 0:
+ k = 2 * dilate_px + 1
+ kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (k, k))
+ mask_bin = cv2.dilate(mask_bin, kernel, iterations=1)
+
+ if inpaint_method == "masked_blur":
+ # Fill masked pixels from a blurred image only
+ # (fast baseline, not true inpainting)
+ blur = cv2.GaussianBlur(img_rgb, (0, 0), sigmaX=5, sigmaY=5)
+ out = img_rgb.copy()
+ m = mask_bin > 0
+ out[m] = blur[m]
+ else:
+ # OpenCV inpaint expects BGR
+ img_bgr = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2BGR)
+ flag = cv2.INPAINT_TELEA if inpaint_method == "telea" else cv2.INPAINT_NS
+ out_bgr = cv2.inpaint(img_bgr, mask_bin, inpaintRadius=float(inpaint_radius), flags=flag)
+ out = cv2.cvtColor(out_bgr, cv2.COLOR_BGR2RGB)
+
+ out_pil = Image.fromarray(out, mode="RGB")
+
+ if post_blur_radius and post_blur_radius > 0:
+ out_pil = out_pil.filter(ImageFilter.GaussianBlur(radius=post_blur_radius))
+
+ return out_pil
+
+# -------------------------------
+# 1b. CV-based mask + segmentation (DEFAULT)
+# -------------------------------
+
+def detect_text_mask_cv(
+ img_rgb: np.ndarray,
+ win_size: int = 35,
+ C: int = 10,
+ morph_kernel: int = 3,
+) -> np.ndarray:
+ """
+ OpenCV-based text mask.
+ img_rgb: HxWx3 RGB uint8 array.
+ Returns: mask uint8 (0 background, 255 foreground).
+ """
+ gray = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2GRAY)
+ bg = cv2.GaussianBlur(gray, (0, 0), sigmaX=15, sigmaY=15)
+ norm = cv2.divide(gray, bg, scale=255)
+ norm = cv2.normalize(norm, None, 0, 255, cv2.NORM_MINMAX)
+
+ if win_size % 2 == 0:
+ win_size += 1
+
+ th = cv2.adaptiveThreshold(
+ norm, 255,
+ cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
+ cv2.THRESH_BINARY_INV,
+ 75,
+ 10
+ )
+
+ nb_components, output, stats, centroids = cv2.connectedComponentsWithStats(th, connectivity=8)
+ sizes = stats[1:, cv2.CC_STAT_AREA]
+ min_size = max(2, (img_rgb.shape[0] * img_rgb.shape[1]) // 200000)
+
+ mask = np.zeros_like(th, dtype=np.uint8)
+ for i, sz in enumerate(sizes):
+ if sz >= min_size:
+ mask[output == (i + 1)] = 255
+
+ # Optional morphology, if you want it:
+ # kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (morph_kernel, morph_kernel))
+ # mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)
+
+ return mask
+
+
+def detect_text_mask_sauvola(
+ img_rgb: np.ndarray,
+ win: int = 51,
+ k: float = 0.2,
+ R: float = 128.0,
+) -> np.ndarray:
+ """Single-window Sauvola binarisation."""
+ gray = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2GRAY)
+
+ bg = cv2.GaussianBlur(gray, (0, 0), sigmaX=15, sigmaY=15)
+ norm = cv2.divide(gray, bg, scale=255)
+ norm = cv2.normalize(norm, None, 0, 255, cv2.NORM_MINMAX).astype(np.uint8)
+
+ h, w = norm.shape
+ norm_f = norm.astype(np.float64)
+
+ win = win | 1
+ local_mean = cv2.boxFilter(norm_f, ddepth=-1, ksize=(win, win),
+ borderType=cv2.BORDER_REFLECT)
+ local_sq_mean = cv2.boxFilter(norm_f * norm_f, ddepth=-1, ksize=(win, win),
+ borderType=cv2.BORDER_REFLECT)
+ local_var = np.maximum(local_sq_mean - local_mean * local_mean, 0.0)
+ local_std = np.sqrt(local_var)
+
+ threshold = local_mean * (1.0 + k * (local_std / R - 1.0))
+ mask = np.where(norm_f < threshold, 255, 0).astype(np.uint8)
+
+ nb_components, output, stats, _ = cv2.connectedComponentsWithStats(mask, connectivity=8)
+ sizes = stats[1:, cv2.CC_STAT_AREA]
+ min_size = max(2, (h * w) // 200000)
+ cleaned = np.zeros_like(mask, dtype=np.uint8)
+ for i, sz in enumerate(sizes):
+ if sz >= min_size:
+ cleaned[output == (i + 1)] = 255
+
+ return cleaned
+
+
+def segment_page_to_mrc_components_cv(
+ pil_img: Image.Image,
+ win_size: int = 35,
+ C: int = 10,
+ morph_kernel: int = 3,
+ mask_method: str = "sauvola",
+ inpaint_bg: bool = False,
+ inpaint_method: str = "telea",
+ inpaint_radius: int = 3,
+ inpaint_dilate_px: int = 1,
+ inpaint_post_blur: float = 1.5,
+ input_dpi: float = 0.0,
+ sauvola_max_dpi: float = 300.0,
+) -> Tuple[Image.Image, Image.Image, Image.Image]:
+ """
+ CV-based segmentation. mask_method selects the binarisation algorithm:
+ "sauvola" - Sauvola thresholding (default)
+ "cv" - adaptive Gaussian threshold
+ Produces:
+ - bg_img: smooth colour background (RGB)
+ - fg_img: colour foreground-only (RGB on white)
+ - mask_img: binary mask (L, 0/255)
+
+ When inpaint_bg=True, text regions identified by the mask are removed from
+ the background via OpenCV inpainting before the blur, preventing ghosted
+ text in the final composite.
+ """
+ orig_rgb = pil_img.convert("RGB")
+ img_np = np.asarray(orig_rgb, dtype=np.uint8)
+
+ orig_h, orig_w = img_np.shape[:2]
+
+ if mask_method == "sauvola":
+ seg_img = img_np
+ if input_dpi > sauvola_max_dpi > 0:
+ s = sauvola_max_dpi / input_dpi
+ small_w = max(1, int(round(orig_w * s)))
+ small_h = max(1, int(round(orig_h * s)))
+ seg_img = cv2.resize(img_np, (small_w, small_h),
+ interpolation=cv2.INTER_AREA)
+ mask_np = detect_text_mask_sauvola(seg_img)
+ if mask_np.shape[:2] != (orig_h, orig_w):
+ mask_np = cv2.resize(mask_np, (orig_w, orig_h),
+ interpolation=cv2.INTER_NEAREST)
+ else:
+ mask_np = detect_text_mask_cv(img_np, win_size=win_size, C=C, morph_kernel=morph_kernel)
+ mask_np = np.where(mask_np > 0, 255, 0).astype(np.uint8)
+ mask_final = mask_np > 0
+
+ mask_img = Image.fromarray(mask_np, mode="L")
+
+ if inpaint_bg:
+ bg_img = make_inpainted_background_cv(
+ orig_rgb,
+ mask_img,
+ inpaint_method=inpaint_method,
+ inpaint_radius=inpaint_radius,
+ dilate_px=inpaint_dilate_px,
+ post_blur_radius=inpaint_post_blur,
+ )
+ else:
+ bg_img = orig_rgb.filter(ImageFilter.GaussianBlur(radius=5))
+
+ fg_arr = img_np.copy()
+ fg_img = Image.fromarray(fg_arr, mode="RGB")
+
+ return bg_img, fg_img, mask_img
+
+
+# -------------------------------
+# 2. PDF assembly helpers
+# -------------------------------
+
+def encode_pil_to_bytes(img: Image.Image, fmt: str, **save_params) -> bytes:
+ bio = io.BytesIO()
+ img.save(bio, format=fmt, **save_params)
+ return bio.getvalue()
+
+
+def _scale_cv_params_for_dpi(
+ input_dpi: float,
+ base_dpi: float,
+ win_size: int,
+ C: int,
+ morph_kernel: int,
+):
+ scale = input_dpi / base_dpi
+ win = int(round(win_size * scale))
+ win = max(3, win | 1)
+ mk = max(1, int(round(morph_kernel * scale)))
+ C_scaled = max(1, int(round(C * (scale ** 0.5))))
+ return win, C_scaled, mk
+
+
+def _iter_pil_pages_from_pdf_bytes(pdf_bytes: bytes, render_dpi: int):
+ src_pdf_doc = fitz.open(stream=pdf_bytes, filetype="pdf")
+ try:
+ for page in src_pdf_doc:
+ pix = page.get_pixmap(dpi=render_dpi)
+ mode = "RGB" if pix.alpha == 0 else "RGBA"
+ pil_img = Image.frombytes(mode, (pix.width, pix.height), pix.samples)
+ if mode == "RGBA":
+ pil_img = pil_img.convert("RGB")
+ yield pil_img
+ finally:
+ src_pdf_doc.close()
+
+
+def _iter_pil_pages_from_tiff_bytes(tiff_bytes: bytes):
+ bio = io.BytesIO(tiff_bytes)
+ tiff_img = Image.open(bio)
+ for pil_page in ImageSequence.Iterator(tiff_img):
+ yield pil_page.convert("RGB")
+
+
+def _read_image_dpi(image_bytes: bytes, fallback: float) -> float:
+ """Read DPI from JPEG/PNG metadata; return fallback if absent or invalid."""
+ try:
+ bio = io.BytesIO(image_bytes)
+ img = Image.open(bio)
+ dpi_info = img.info.get("dpi")
+ if dpi_info:
+ dpi_val = float(dpi_info[0])
+ if dpi_val > 0:
+ return dpi_val
+ except Exception:
+ pass
+ return fallback
+
+
+def _iter_pil_pages_from_image_bytes(image_bytes: bytes):
+ """Yield a single RGB PIL image from JPEG or PNG bytes."""
+ bio = io.BytesIO(image_bytes)
+ img = Image.open(bio)
+ yield img.convert("RGB")
+
+
+def jpeg_roundtrip_pil(img: Image.Image, quality: int, *, subsampling=2, optimize=True, progressive=True) -> Image.Image:
+ """
+ Encode to JPEG bytes then decode back to PIL.
+ subsampling: 0=4:4:4, 1=4:2:2, 2=4:2:0 (common/smaller)
+ """
+ b = encode_pil_to_bytes(
+ img.convert("RGB"),
+ "JPEG",
+ quality=int(quality),
+ subsampling=subsampling,
+ optimize=optimize,
+ progressive=progressive,
+ )
+ return Image.open(io.BytesIO(b)).convert("RGB")
+
+
+def _build_pdfa2b_xmp() -> str:
+ return (
+ ''
+ '