diff --git a/wikibaseintegrator/wbi_helpers.py b/wikibaseintegrator/wbi_helpers.py index f6741407..1c7e1348 100644 --- a/wikibaseintegrator/wbi_helpers.py +++ b/wikibaseintegrator/wbi_helpers.py @@ -8,7 +8,7 @@ import logging import re from time import sleep -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Union from urllib.parse import urlparse import requests @@ -338,33 +338,94 @@ def edit_entity(data: dict, id: str | None = None, type: str | None = None, base return mediawiki_api_call_helper(data=params, is_bot=is_bot, **kwargs) +def create_redirect(from_id: str, to_id: str, login: _Login | None = None) -> dict[str, Any]: + """ + Create a MediaWiki redirect from `from_id` to `to_id`. + """ + log.info(f"Creating redirect from {from_id} → {to_id}") + + data = { + "action": "wbcreateredirect", + "from": from_id, + "to": to_id, + "format": "json" + } -def merge_items(from_id: str, to_id: str, login: _Login | None = None, ignore_conflicts: list[str] | None = None, is_bot: bool = False, **kwargs: Any) -> dict: + try: + response = mediawiki_api_call_helper(data=data, login=login, is_bot=True) + if "error" in response: + log.error(f"Failed to create redirect {from_id} → {to_id}: {response['error']}") + else: + log.info(f"Redirect created successfully: {from_id} → {to_id}") + return response + except Exception as e: + log.error(f"Exception when creating redirect {from_id} → {to_id}: {e}") + return {"error": str(e)} + +def merge_items_and_create_redirect( + qids: list[str], + login: _Login | None = None, + ignore_conflicts: Union[str, list[str]] | None = "description", + is_bot: bool = False, + tags: Union[str, list[str]] | None = None, + **kwargs: Any +) -> str: """ - A static method to merge two items + Merge multiple Wikibase items into the lowest QID. - :param from_id: The ID to merge from. This parameter is required. - :param to_id: The ID to merge to. This parameter is required. + :param qids: List of item QIDs to merge. The lowest QID will be kept. :param login: A wbi_login.Login instance - :param ignore_conflicts: List of elements of the item to ignore conflicts for. Can only contain values of "description", "sitelink" and "statement" - :param is_bot: Mark this edit as bot. + :param ignore_conflicts: List of elements to ignore conflicts for. Can contain "description", "sitelink", "statement". Defaults to "description" for merge to work at all. + :param is_bot: Mark this edit as bot + :param tags: Single tag string or list of tags to attach to the edit + :param kwargs: Additional parameters to pass to mediawiki_api_call_helper + + :return: Final QID after merge. """ + if not qids or len(qids) < 2: + raise ValueError("You must provide at least two QIDs to merge") - params = { - 'action': 'wbmergeitems', - 'fromid': from_id, - 'toid': to_id, - 'format': 'json' - } + # Sort QIDs numerically to keep the lowest + sorted_qids = sorted(qids, key=lambda x: int(x.lstrip('Q'))) + to_id = sorted_qids[0] # keep the lowest QID + from_ids = sorted_qids[1:] # merge all other QIDs into to_id - if ignore_conflicts is not None: - params.update({'ignoreconflicts': '|'.join(ignore_conflicts)}) + # Prepare tags string if provided + tags_str = None + if tags: + if isinstance(tags, list): + tags_str = '|'.join(tags) + else: + tags_str = str(tags) - if is_bot: - params.update({'bot': ''}) + for from_id in from_ids: + params = { + 'action': 'wbmergeitems', + 'fromid': from_id, + 'toid': to_id, + 'format': 'json' + } - return mediawiki_api_call_helper(data=params, login=login, is_bot=is_bot, **kwargs) + if ignore_conflicts is not None: + params['ignoreconflicts'] = '|'.join(ignore_conflicts) + + if is_bot: + params['bot'] = '' + if tags_str: + params['tags'] = tags_str + + try: + mediawiki_api_call_helper(data=params, login=login, is_bot=is_bot, **kwargs) + print(f"Merged {from_id} into {to_id}") + create_redirect(from_id=from_id, to_id=to_id, login=login) + return to_id + except Exception as e: + print(f"Error merging {from_id} into {to_id}: {e}. " + f"Note: sometimes the real cause does not get propagated because of a bug in Wikibase. " + f"See https://github.com/dpriskorn/DanceDatabase/issues/2") + return "" + return "" def merge_lexemes(source: str, target: str, login: _Login | None = None, summary: str | None = None, is_bot: bool = False, **kwargs: Any) -> dict: """