Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions docling/backend/html_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
from bs4 import BeautifulSoup, NavigableString, PageElement, Tag
from bs4.element import PreformattedString
from docling_core.types.doc import (
BoundingBox,
DocItem,
DocItemLabel,
DoclingDocument,
DocumentOrigin,
GroupItem,
GroupLabel,
PictureItem,
ProvenanceItem,
RefItem,
RichTableCell,
TableCell,
Expand Down Expand Up @@ -501,6 +503,63 @@ def parse_table_data(
doc.add_table_cell(table_item=docling_table, cell=simple_cell)
return data

# --- provenance helpers -------------------------------------------------

def _bs4_xpath(self, el: Tag) -> ProvenanceItem:
"""
Build an XPath-like path for a BeautifulSoup Tag without lxml.
Format: /html/body/div[3]/p[2]
Uses 1-based index among same-tag siblings (elements only).

Args:
el: The BeautifulSoup Tag to build an XPath-like path for.

Returns:
The XPath-like path for the given BeautifulSoup Tag.
"""
parts = []
cur = el
while isinstance(cur, Tag):
tag = cur.name.lower() if isinstance(cur.name, str) else str(cur.name)
parent = cur.parent if isinstance(cur.parent, Tag) else None
if parent is None or not isinstance(parent, Tag):
parts.append(f"/{tag}[1]")
break
# only element children with the same tag name
sibs = list(parent.find_all(tag, recursive=False))
# 1-based index among same-tag siblings
idx = sibs.index(cur) + 1 if cur in sibs else 1
parts.append(f"/{tag}[{idx}]")
cur = parent

return ProvenanceItem(
page_no=1,
charspan=(0, 0),
bbox=BoundingBox(l=0, t=0, r=0, b=0),
xpath="".join(reversed(parts)) or "/",
)

def _begin_capture(self, doc: DoclingDocument) -> dict[str, int]:
"""Snapshot sizes of collections likely to receive new items."""
return {
"texts": len(doc.texts),
"groups": len(doc.groups),
"tables": len(doc.tables),
"pictures": len(doc.pictures),
}

def _end_capture(
self, doc: DoclingDocument, start: dict[str, int], el: Tag
) -> None:
"""Attach provenance to all items appended since _begin_capture()."""
for kind, start_len in start.items():
coll = getattr(doc, kind)
for item in coll[start_len:]:
if hasattr(item, "prov"):
item.prov.append(self._bs4_xpath(el))

# --- provenance helpers -------------------------------------------------

def _walk(self, element: Tag, doc: DoclingDocument) -> list[RefItem]:
"""Parse an XML tag by recursively walking its content.

Expand Down Expand Up @@ -535,6 +594,7 @@ def flush_buffer():
docling_code2 = doc.add_code(
parent=self.parents[self.level],
text=seg_clean,
prov=self._bs4_xpath(element),
content_layer=self.content_layer,
formatting=annotated_text.formatting,
hyperlink=annotated_text.hyperlink,
Expand All @@ -545,6 +605,7 @@ def flush_buffer():
parent=self.parents[self.level],
label=DocItemLabel.TEXT,
text=seg_clean,
prov=self._bs4_xpath(element),
content_layer=self.content_layer,
formatting=annotated_text.formatting,
hyperlink=annotated_text.hyperlink,
Expand All @@ -556,24 +617,34 @@ def flush_buffer():
name = node.name.lower()
if name == "img":
flush_buffer()
start_cap = self._begin_capture(doc)
im_ref3 = self._emit_image(node, doc)
self._end_capture(doc, start_cap, node)
if im_ref3:
added_refs.append(im_ref3)
elif name in _FORMAT_TAG_MAP:
with self._use_format([name]):
start_cap = self._begin_capture(doc)
wk = self._walk(node, doc)
self._end_capture(doc, start_cap, node)
added_refs.extend(wk)
elif name == "a":
with self._use_hyperlink(node):
start_cap = self._begin_capture(doc)
wk2 = self._walk(node, doc)
self._end_capture(doc, start_cap, node)
added_refs.extend(wk2)
elif name in _BLOCK_TAGS:
flush_buffer()
start_cap = self._begin_capture(doc)
blk = self._handle_block(node, doc)
self._end_capture(doc, start_cap, node)
added_refs.extend(blk)
elif node.find(_BLOCK_TAGS):
flush_buffer()
start_cap = self._begin_capture(doc)
wk3 = self._walk(node, doc)
self._end_capture(doc, start_cap, node)
added_refs.extend(wk3)
else:
buffer.extend(
Expand Down