Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ sentence-transformers = "^5.2.2"
spacy = "^3.8.11"
refinedoc = "^1.0.0"
qdrant-client = "1.16.2"
<<<<<<< Feature/external-id-scientif-journals
python-dotenv = "^1.2.1"
=======
python-dotenv = "^1.2.2"
>>>>>>> main
beautifulsoup4 = "^4.14.3"
pyphen = "^0.17.2"
ijson = "^3.4.0"
Expand Down
4 changes: 3 additions & 1 deletion tests/document_collector_hub/plugins_test/test_open_alex.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

def build_openalex_result(
url: str = "https://openalex.org/W123",
doi: str = "10.1234/example",
doi: str = "https://doi.org/10.1234/example",
title: str = "Sample Title",
):
ids = Ids(openalex=url, doi=doi, mag="", pmid="", pmcid="")
Expand Down Expand Up @@ -216,6 +216,8 @@ def test_update_welearn_document_returns_expected_document(self, mock_pdf):
doc.details["authors"][0]["name"],
openalex_result.authorships[0].author.display_name,
)
self.assertEqual(doc.external_id, "10.1234/example")
self.assertEqual(doc.external_id_type, "doi")

# Test _update_welearn_document raises on closed access
@patch("welearn_datastack.plugins.rest_requesters.open_alex.get_new_https_session")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ def test_plugin_run_success(self, mock_get_session):
self.assertTrue(doc_result.document.title)
self.assertTrue(doc_result.document.description)
self.assertTrue(doc_result.document.full_content)
self.assertEqual(doc_result.document.external_id, "10.7717/peerj.12713")
self.assertEqual(doc_result.document.external_id_type, "doi")
self.assertIsInstance(doc_result.document.details, dict)
self.assertIn("license_url", doc_result.document.details)
self.assertIn("authors", doc_result.document.details)
Expand Down
10 changes: 7 additions & 3 deletions welearn_datastack/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,23 +113,23 @@ def __init__(self, message="Invalid language code, must be lower ISO-639-1 code"
super().__init__(self.message)


class VersionNumberError(BaseException):
class VersionNumberError(Exception):
"""Raised when an invalid version number is used"""

def __init__(self, message="Invalid version number, must be an integer"):
self.message = message
super().__init__(self.message)


class NoPreviousCollectionError(BaseException):
class NoPreviousCollectionError(Exception):
"""Raised when there is no previous collection"""

def __init__(self, message="No previous collection found"):
self.message = message
super().__init__(self.message)


class NoConnectedCollectionError(BaseException):
class NoConnectedCollectionError(Exception):
"""Raised when there is no connected collection"""

def __init__(self, message="No connected collection found"):
Expand Down Expand Up @@ -238,3 +238,7 @@ def __init__(self, msg="No title found in this document", *args):

class NoDescriptionFoundError(NotEnoughData):
"""Raised when there is no description found"""


class NoDOIFoundError(NotEnoughData):
"""Raised when there is no DOI found"""
256 changes: 179 additions & 77 deletions welearn_datastack/plugins/rest_requesters/open_alex.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
from dataclasses import asdict
from datetime import datetime
from itertools import batched
from typing import Iterable
from typing import Any, Iterable
from urllib.parse import urlparse

from welearn_database.data.enumeration import ExternalIdType
from welearn_database.data.models import WeLearnDocument

from welearn_datastack.constants import (
Expand Down Expand Up @@ -69,10 +70,11 @@ def __init__(self) -> None:
self.team_email = team_email

@staticmethod
def _invert_abstract(inv_index: dict[str, list[int]]) -> str:
def _invert_abstract(inv_index: dict[str, list[int]]) -> str | None:
if inv_index is not None:
l_inv = [(w, p) for w, pos in inv_index.items() for p in pos]
return " ".join(map(lambda x: x[0], sorted(l_inv, key=lambda x: x[1])))
return " ".join([x[0] for x in sorted(l_inv, key=lambda x: x[1])])
return None

@staticmethod
def _extract_openalex_id_from_urls(urls: Iterable[str]) -> list[str]:
Expand Down Expand Up @@ -183,44 +185,98 @@ def _remove_useless_first_word(
return string_to_clear

def _update_welearn_document(self, wrapper: WrapperRawData) -> WeLearnDocument:
document_title = wrapper.raw_data.title
document_url = wrapper.raw_data.ids.openalex
logger.info(f"Process {document_url}...")
document_desc = self._remove_useless_first_word(
string_to_clear=self._invert_abstract(
wrapper.raw_data.abstract_inverted_index
),
useless_words=["background", "abstract", "introduction"],
)
self._check_publisher_authorization(wrapper)
self._check_access(document_url, wrapper)
self._check_license(document_url, wrapper)
logger.info(f"The content {document_url} is legally usable")

work_locations: list[Location] = wrapper.raw_data.locations
host_ids = []
for location in work_locations:
host_organization_lineage_malformed: list[str] = (
location.source.host_organization_lineage
document_desc = self.build_description(wrapper)
document_content, pdf_flag = self._resolve_full_content(document_desc, wrapper)
document_details = self._build_details(document_url, pdf_flag, wrapper)
wrapper.document.title = wrapper.raw_data.title
wrapper.document.description = document_desc
wrapper.document.content = document_content
wrapper.document.details = document_details
wrapper.document.external_id = self._get_doi(wrapper)
wrapper.document.external_id_type = ExternalIdType.DOI

return wrapper.document

def _build_details(
self,
document_url: str | None,
pdf_flag: str | Any,
wrapper: WrapperRawData,
) -> dict[
str | Any,
int | str | None | list[dict[str, Any]] | list[str | None] | list[str] | Any,
]:
"""
Build the details of the document in a dict format expected by the WeLearn DB
:param document_url: URL of the document to build the details for (used for logging purposes)
:param pdf_flag: flag indicating if the content of the document is from the PDF or not (used for logging purposes)
:param wrapper: WrapperRawData containing the raw data of the document to build the details from
:return: dict containing the details of the document in the format expected by the WeLearn DB
"""
document_details = {
"publication_date": self._build_publication_date(wrapper),
"type": wrapper.raw_data.type,
"doi": self._get_doi(wrapper),
"publisher": wrapper.raw_data.best_oa_location.source.host_organization_name,
"license_url": self._get_licence(document_url, wrapper),
"issn": wrapper.raw_data.best_oa_location.source.issn_l,
Comment thread
lpi-tn marked this conversation as resolved.
"content_from_pdf": pdf_flag,
"topics": [
asdict(t) for t in self._transform_topics(wrapper.raw_data.topics)
],
"tags": [x.display_name for x in wrapper.raw_data.keywords],
"referenced_works": wrapper.raw_data.referenced_works,
"related_works": wrapper.raw_data.related_works,
"authors": self._build_authors_list(wrapper),
}
return document_details

@staticmethod
def _get_doi(wrapper: WrapperRawData) -> str | None:
doi = wrapper.raw_data.ids.doi
Comment thread
lpi-tn marked this conversation as resolved.
if doi.startswith("https://doi.org/"):
doi = doi.replace("https://doi.org/", "")
return doi

@staticmethod
def _build_authors_list(wrapper: WrapperRawData) -> list[Any]:
authors = []
for author_info in wrapper.raw_data.authorships:
authors.append(
{
"name": author_info.author.display_name,
"misc": ",".join(author_info.raw_affiliation_strings),
}
)
if (
host_organization_lineage_malformed is None
or len(host_organization_lineage_malformed) == 0
):
continue
try:
host_organization_lineage = self._extract_openalex_id_from_urls(
host_organization_lineage_malformed
)
host_ids.extend(host_organization_lineage)
except ManagementExceptions as e:
logger.warning(
f"Cannot extract host organization lineage from {location.source.host_organization_lineage}: {e}"
)
continue
return authors

avoiding_ids = PUBLISHERS_TO_AVOID
for host_id in host_ids:
if host_id.upper() in avoiding_ids:
raise UnauthorizedPublisher(f"{host_id} is not authorized in welearn")
@staticmethod
def _build_publication_date(wrapper: WrapperRawData) -> int:
publication_date = int(
datetime.strptime(
wrapper.raw_data.publication_date, YEAR_FIRST_DATE_FORMAT
).timestamp()
)
return publication_date

def _resolve_full_content(
self, document_desc: str | Any, wrapper: WrapperRawData
) -> tuple[bool, str | Any]:
"""
Get the full content of the document. If the PDF is available and can be retrieved, extract the content from the PDF. Otherwise, use the description as the content.
:param document_desc: Description of the document to use as content if the PDF is not available or cannot be retrieved
:param wrapper: WrapperRawData containing the raw data of the document to get the content from
:return: tuple containing a flag indicating if the content is from the PDF and the content of the document
Comment thread
lpi-tn marked this conversation as resolved.
"""
document_content = document_desc

if wrapper.raw_data.best_oa_location.pdf_url is None:
pdf_flag = False
else:
Expand All @@ -240,65 +296,111 @@ def _update_welearn_document(self, wrapper: WrapperRawData) -> WeLearnDocument:
f"PDF retrievement error, use description as content: {e}"
)
pdf_flag = False
return document_content, pdf_flag

publication_date = int(
datetime.strptime(
wrapper.raw_data.publication_date, YEAR_FIRST_DATE_FORMAT
).timestamp()
)

authors = []
for author_info in wrapper.raw_data.authorships:
authors.append(
{
"name": author_info.author.display_name,
"misc": ",".join(author_info.raw_affiliation_strings),
}
def build_description(self, wrapper: WrapperRawData) -> str | Any:
document_desc = self._remove_useless_first_word(
string_to_clear=self._invert_abstract(
wrapper.raw_data.abstract_inverted_index
)
or "",
useless_words=["background", "abstract", "introduction"],
)
return document_desc

@staticmethod
def _check_access(document_url: str, wrapper: WrapperRawData):
"""
Check if the document is open access. If not, raise a ClosedAccessContent exception
:param document_url: URL of the document to check
:param wrapper: WrapperRawData containing the raw data of the document to check
:exception ClosedAccessContent: If the document is not open access
"""
if not wrapper.raw_data.open_access.is_oa:
Comment thread
lpi-tn marked this conversation as resolved.
raise ClosedAccessContent()
else:
logger.info(f"The content {document_url} is open access")

best_oa_location_info = wrapper.raw_data.best_oa_location
def _check_license(self, document_url: str, wrapper: WrapperRawData):
"""
Check if the license of the document is in the list of authorized licenses. If not, raise an UnauthorizedLicense exception
:param document_url: URL of the document to check
:param wrapper: WrapperRawData containing the raw data of the document to check
:exception UnauthorizedLicense: If the license of the document is not in the list of authorized licenses
"""
license_good_format = self._get_licence(document_url, wrapper)

# Open Alex format is cc-by...
license_openalex_format: str = best_oa_location_info.license
if license_good_format.lower() not in AUTHORIZED_LICENSES:
raise UnauthorizedLicense(f"{license_good_format.lower()} is not allowed")

if not license_openalex_format.startswith("cc"):
raise UnauthorizedLicense()
@staticmethod
def _get_licence(document_url: str | None, wrapper: WrapperRawData) -> str:
"""
Get the license of the document in a good format (https://creativecommons.org/licenses/xxx/4.0/) from the raw data of the document. If the license is not in the expected format, log a warning and return it in lowercase.
:param document_url: URL of the document to get the license from (used for logging purposes)
:param wrapper: WrapperRawData containing the raw data of the document to get the license from
:return: License of the document in a good format (https://creativecommons.org/licenses/xxx/4.0/) if it is in the expected format, otherwise in lowercase
"""
best_oa_location_info = wrapper.raw_data.best_oa_location
license_openalex_format: str = best_oa_location_info.license
if license_openalex_format is None:
logger.warning(
f"No license found for {document_url}, set it to empty string"
)
return ""

logger.info(f"The content {document_url} is legally usable")
if not license_openalex_format.startswith("cc-"):
logger.warning(
f"License {license_openalex_format} of {document_url} is not in the expected format, set it to lowercase"
)
return license_openalex_format.lower()

license_good_format = f"{HTTPS_CREATIVE_COMMONS}/licenses/{license_openalex_format.replace('cc-', '')}/4.0/"
return license_good_format

if license_good_format.lower() not in AUTHORIZED_LICENSES:
raise UnauthorizedLicense(f"{license_good_format.lower()} is not allowed")
def _check_publisher_authorization(self, wrapper: WrapperRawData):
"""
Check if the publisher of the document is authorized to be used in WeLearn. If not, raise an UnauthorizedPublisher exception

logger.info(f"The content {document_url} is legally usable")
:param wrapper: WrapperRawData containing the raw data of the document to check
:exception UnauthorizedPublisher: If the publisher is not authorized to be used in WeLearn
"""
work_locations: list[Location] = wrapper.raw_data.locations
host_ids = self.get_host_ids(work_locations)

document_details = {
"publication_date": publication_date,
"type": wrapper.raw_data.type,
"doi": wrapper.raw_data.ids.doi,
"publisher": wrapper.raw_data.best_oa_location.source.host_organization_name,
"license_url": license_good_format,
"issn": wrapper.raw_data.best_oa_location.source.issn_l,
"content_from_pdf": pdf_flag,
"topics": [
asdict(t) for t in self._transform_topics(wrapper.raw_data.topics)
],
"tags": [x.display_name for x in wrapper.raw_data.keywords],
"referenced_works": wrapper.raw_data.referenced_works,
"related_works": wrapper.raw_data.related_works,
"authors": authors,
}
wrapper.document.title = document_title
wrapper.document.description = document_desc
wrapper.document.content = document_content
wrapper.document.details = document_details
return wrapper.document
avoiding_ids = PUBLISHERS_TO_AVOID
for host_id in host_ids:
if host_id.upper() in avoiding_ids:
raise UnauthorizedPublisher(f"{host_id} is not authorized in welearn")

def get_host_ids(self, work_locations: list[Location]) -> list[Any]:
"""
Get the host organization lineage from the work locations and extract the OpenAlex IDs from it. If the host organization lineage is not in the expected format, log a warning and skip it.

:param work_locations: list of Location objects containing the host organization lineage to extract the OpenAlex IDs from
:return: list of OpenAlex IDs extracted from the host organization lineage
"""
host_ids = []
for location in work_locations:
host_organization_lineage_malformed: list[str] = (
location.source.host_organization_lineage
)
Comment thread
lpi-tn marked this conversation as resolved.
if (
host_organization_lineage_malformed is None
or len(host_organization_lineage_malformed) == 0
):
continue
try:
host_organization_lineage = self._extract_openalex_id_from_urls(
host_organization_lineage_malformed
)
host_ids.extend(host_organization_lineage)
except ManagementExceptions as e:
logger.warning(
f"Cannot extract host organization lineage from {location.source.host_organization_lineage}: {e}"
)
continue
return host_ids

def run(self, documents: list[WeLearnDocument]) -> list[WrapperRetrieveDocument]:
ret: list[WrapperRetrieveDocument] = []
Expand Down
Loading
Loading