diff --git a/README.md b/README.md index 72ab517..9c65701 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,13 @@ This means that misusing the library can lead to leaks where an already preloade You should also upgrade as soon as possible to version 2.0.0 which may mitigate but not totally the situation as we can't by definition control the code consuming this library and any mis-usage, specially concerning cache invalidation calls. If you used this software in adhoc scripts, so in mono user and controlled scenarii, you still are I think still safe. +## Rationale + +I took inspiration from [python-vaultwarden](https://github.com/numberly/python-vaultwarden) to bring the vaultwarden admin site also in this package working. +So it should now be working with vaultwarden admin and bitwarden admin. + +So now this package should be able to give you access to control everything what you want in your bitwarden/vaultwarden instance. + ## Features - API controllable client - Create, Read, Update, Delete, on organizations, collection, ciphers, users (also disable/enable), and attachments @@ -34,6 +41,7 @@ You should also upgrade as soon as possible to version 2.0.0 which may mitigate - Download/Upload attachments to vault and organizations - Integrates a thin wrapper around the official npm CLI (see `call` mathod) - Read [api](src/vaultwardentools/client.py) for more details +- vaultwarden admin access ## Install as a python lib Currently not possible working on this diff --git a/src/vaultwardentools/client.py b/src/vaultwardentools/client.py index 62fe7ba..6ab2a7c 100644 --- a/src/vaultwardentools/client.py +++ b/src/vaultwardentools/client.py @@ -19,7 +19,10 @@ from subprocess import run from time import sleep, time +import httpx import requests +import http +from http.cookiejar import Cookie from jwt import encode as jwt_encode from packaging import version as _version @@ -57,6 +60,7 @@ EMAIL = os.environ.get("BITWARDEN_EMAIL") PASSWORD = os.environ.get("BITWARDEN_PW") ADMIN_PASSWORD = os.environ.get("BITWARDEN_ADMIN_PASSWORD", "") +ADMIN_TOKEN = os.environ.get("BITWARDEN_ADMIN_TOKEN", "") ADMIN_USER = os.environ.get("BITWARDEN_ADMIN_USER", "") CUUID = os.environ.get("BITWARDEN_CLIENT_UUID", "42042042-0042-0042-0042-420004200042") TYPMAPPER = { @@ -281,9 +285,11 @@ class UnimplementedError(BitwardenError): class DecryptError(bwcrypto.DecryptError): """.""" + class WrongVersionOfServer(BitwardenError): """.""" + class SearchError(BitwardenError): """.""" @@ -790,6 +796,7 @@ def __init__( password=PASSWORD, admin_user=ADMIN_USER, admin_password=ADMIN_PASSWORD, + admin_token=ADMIN_TOKEN, private_key=PRIVATE_KEY, client_id="python", client_secret=None, @@ -813,6 +820,7 @@ def __init__( raise RunError("no password") self.admin_user = admin_user self.admin_password = admin_password + self.admin_token = admin_token self._broken_ciphers = OrderedDict() self.vaultier = vaultier self.server = server @@ -835,9 +843,10 @@ def __init__( self.authentication_cb = authentication_cb if login: self.login() - self._is_vaultwarden = False + self._is_vaultwarden = None self._version = version self._api_keys = None + self._token_cookie = None @property def token(self): @@ -848,6 +857,11 @@ def token_set(self, value): self.tokens[self.email] = value return self.tokens[self.email] + def _admin_login(self): + resp = self.adminr("",data={"token": self.admin_token}) + if "VW_ADMIN" in resp.cookies.get_dict(): + self._token_cookie = resp.cookies.get_dict()["VW_ADMIN"] + def adminr( self, uri, @@ -855,19 +869,44 @@ def adminr( headers=None, admin_user=None, admin_password=None, + admin_token=None, + retry=True, *a, **kw, ): - admin_user = admin_user or self.admin_user - admin_password = admin_password or self.admin_password - if admin_user and admin_password: - kw["auth"] = (admin_user, admin_password) + print(uri) + if self._is_vaultwarden is None: + self.version() + if not self._is_vaultwarden: + admin_user = admin_user or self.admin_user + admin_password = admin_password or self.admin_password + if admin_user and admin_password: + kw["auth"] = (admin_user, admin_password) + else: + kw["cookies"] = {"VW_ADMIN": self._token_cookie} url = uri if not url.startswith("http"): url = f"{self.server}/admin{uri}" if headers is None: headers = {} - return getattr(requests, method.lower())(url, headers=headers, *a, **kw) + resp = getattr(requests, method.lower())(url, headers=headers, *a, **kw) + if resp.status_code in [401] and admin_token is not False and retry: + sleep(0.05) + L.debug( + f"Access denied, trying to retry after refreshing token" + ) + self._admin_login() + kw["cookies"] = {"VW_ADMIN": self._token_cookie} + resp = getattr(requests, method.lower())(url, headers=headers, *a, **kw) + if resp.status_code == 429 and retry is not False: + L.debug(f"Too many requests, retrying after 30s {url}") + sleep(60) + resp = getattr(requests, method.lower())(url, headers=headers, *a, **kw) + elif resp.status_code > 399 and retry is not False: + sleep(0.5) + L.debug(f"Something went wrong, retrying {url}") + resp = getattr(requests, method.lower())(url, headers=headers, *a, **kw) + return resp @property def api_keys(self): @@ -2477,21 +2516,21 @@ def post_user_request(self, resp, sync=True): def enable_user(self, email=None, name=None, id=None, user=None): user = self.get_user(email=email, name=name, id=id, user=user) - resp = self.adminr(f"/users/{user.id}/enable") + resp = self.adminr(f"/users/{user.id}/enable", headers={"Content-Type":"application/json"}) self.post_user_request(resp) L.info(f"Enabled user {user.email} / {user.name} / {user.id}") return resp def disable_user(self, email=None, name=None, id=None, user=None): user = self.get_user(email=email, name=name, id=id, user=user) - resp = self.adminr(f"/users/{user.id}/disable") + resp = self.adminr(f"/users/{user.id}/disable", headers={"Content-Type":"application/json"}) self.post_user_request(resp) L.info(f"Disabled user {user.email} / {user.name} / {user.id}") return resp def delete_user(self, email=None, name=None, id=None, user=None, sync=True, **kw): user = self.get_user(email=email, name=name, id=id, user=user, sync=sync) - resp = self.adminr(f"/users/{user.id}/delete") + resp = self.adminr(f"/users/{user.id}/delete", headers={"Content-Type":"application/json"}) self.post_user_request(resp) self.uncache(obj=user, **kw) L.info(f"Deleted user {user.email} / {user.name} / {user.id}") @@ -2977,7 +3016,7 @@ def add_user_to_organization( collections, orga=orga, token=token ) params["collections"] = self.compute_accesses( - dcollections, readonly=readonly, hidepasswords=hidepasswords,manage=manage + dcollections, readonly=readonly, hidepasswords=hidepasswords, manage=manage )["payloads"] u = f"/api/organizations/{orga.id}/users/invite" v, i = self.version() @@ -3170,7 +3209,7 @@ def set_organization_access( return payloads def compute_accesses( - self, dcollections, remove=False, readonly=False, hidepasswords=False,manage=False + self, dcollections, remove=False, readonly=False, hidepasswords=False, manage=False ): ret = {"payloads": [], "remove": []} for cid, col in (dcollections or {}).items(): @@ -3622,9 +3661,9 @@ def get_users_from_group(self, group, orga=None, sync=None, token=None): def edit_group(self, group, - orga = None, - users = None, - collections = None, + orga=None, + users=None, + collections=None, readonly=False, hidepasswords=False, manage=False, @@ -3652,7 +3691,8 @@ def edit_group(self, payload["collections"] = self.compute_accesses( dcollections, readonly=readonly, hidepasswords=hidepasswords, manage=manage )["payloads"] - resp = self.r(f"/api/organizations/{group.organizationId}/groups/{_id}", json=payload, method="put", token=token) + resp = self.r(f"/api/organizations/{group.organizationId}/groups/{_id}", json=payload, method="put", + token=token) self.assert_bw_response(resp, expected_status_codes=[200, 500]) return resp