Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ This means that misusing the library can lead to leaks where an already preloade

You should also upgrade as soon as possible to version 2.0.0 which may mitigate but not totally the situation as we can't by definition control the code consuming this library and any mis-usage, specially concerning cache invalidation calls. If you used this software in adhoc scripts, so in mono user and controlled scenarii, you still are I think still safe.

## Rationale

I took inspiration from [python-vaultwarden](https://github.com/numberly/python-vaultwarden) to bring the vaultwarden admin site also in this package working.
So it should now be working with vaultwarden admin and bitwarden admin.

So now this package should be able to give you access to control everything what you want in your bitwarden/vaultwarden instance.

## Features
- API controllable client
- Create, Read, Update, Delete, on organizations, collection, ciphers, users (also disable/enable), and attachments
Expand All @@ -34,6 +41,7 @@ You should also upgrade as soon as possible to version 2.0.0 which may mitigate
- Download/Upload attachments to vault and organizations
- Integrates a thin wrapper around the official npm CLI (see `call` mathod)
- Read [api](src/vaultwardentools/client.py) for more details
- vaultwarden admin access

## Install as a python lib
Currently not possible working on this
Expand Down
70 changes: 55 additions & 15 deletions src/vaultwardentools/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
from subprocess import run
from time import sleep, time

import httpx
import requests
import http
from http.cookiejar import Cookie
from jwt import encode as jwt_encode
from packaging import version as _version

Expand Down Expand Up @@ -57,6 +60,7 @@
EMAIL = os.environ.get("BITWARDEN_EMAIL")
PASSWORD = os.environ.get("BITWARDEN_PW")
ADMIN_PASSWORD = os.environ.get("BITWARDEN_ADMIN_PASSWORD", "")
ADMIN_TOKEN = os.environ.get("BITWARDEN_ADMIN_TOKEN", "")
ADMIN_USER = os.environ.get("BITWARDEN_ADMIN_USER", "")
CUUID = os.environ.get("BITWARDEN_CLIENT_UUID", "42042042-0042-0042-0042-420004200042")
TYPMAPPER = {
Expand Down Expand Up @@ -281,9 +285,11 @@ class UnimplementedError(BitwardenError):
class DecryptError(bwcrypto.DecryptError):
"""."""


class WrongVersionOfServer(BitwardenError):
"""."""


class SearchError(BitwardenError):
"""."""

Expand Down Expand Up @@ -790,6 +796,7 @@ def __init__(
password=PASSWORD,
admin_user=ADMIN_USER,
admin_password=ADMIN_PASSWORD,
admin_token=ADMIN_TOKEN,
private_key=PRIVATE_KEY,
client_id="python",
client_secret=None,
Expand All @@ -813,6 +820,7 @@ def __init__(
raise RunError("no password")
self.admin_user = admin_user
self.admin_password = admin_password
self.admin_token = admin_token
self._broken_ciphers = OrderedDict()
self.vaultier = vaultier
self.server = server
Expand All @@ -835,9 +843,10 @@ def __init__(
self.authentication_cb = authentication_cb
if login:
self.login()
self._is_vaultwarden = False
self._is_vaultwarden = None
self._version = version
self._api_keys = None
self._token_cookie = None

@property
def token(self):
Expand All @@ -848,26 +857,56 @@ def token_set(self, value):
self.tokens[self.email] = value
return self.tokens[self.email]

def _admin_login(self):
resp = self.adminr("",data={"token": self.admin_token})
if "VW_ADMIN" in resp.cookies.get_dict():
self._token_cookie = resp.cookies.get_dict()["VW_ADMIN"]

def adminr(
self,
uri,
method="post",
headers=None,
admin_user=None,
admin_password=None,
admin_token=None,
retry=True,
*a,
**kw,
):
admin_user = admin_user or self.admin_user
admin_password = admin_password or self.admin_password
if admin_user and admin_password:
kw["auth"] = (admin_user, admin_password)
print(uri)
if self._is_vaultwarden is None:
self.version()
if not self._is_vaultwarden:
admin_user = admin_user or self.admin_user
admin_password = admin_password or self.admin_password
if admin_user and admin_password:
kw["auth"] = (admin_user, admin_password)
else:
kw["cookies"] = {"VW_ADMIN": self._token_cookie}
url = uri
if not url.startswith("http"):
url = f"{self.server}/admin{uri}"
if headers is None:
headers = {}
return getattr(requests, method.lower())(url, headers=headers, *a, **kw)
resp = getattr(requests, method.lower())(url, headers=headers, *a, **kw)
if resp.status_code in [401] and admin_token is not False and retry:
sleep(0.05)
L.debug(
f"Access denied, trying to retry after refreshing token"
)
self._admin_login()
kw["cookies"] = {"VW_ADMIN": self._token_cookie}
resp = getattr(requests, method.lower())(url, headers=headers, *a, **kw)
if resp.status_code == 429 and retry is not False:
L.debug(f"Too many requests, retrying after 30s {url}")
sleep(60)
resp = getattr(requests, method.lower())(url, headers=headers, *a, **kw)
elif resp.status_code > 399 and retry is not False:
sleep(0.5)
L.debug(f"Something went wrong, retrying {url}")
resp = getattr(requests, method.lower())(url, headers=headers, *a, **kw)
return resp

@property
def api_keys(self):
Expand Down Expand Up @@ -2477,21 +2516,21 @@ def post_user_request(self, resp, sync=True):

def enable_user(self, email=None, name=None, id=None, user=None):
user = self.get_user(email=email, name=name, id=id, user=user)
resp = self.adminr(f"/users/{user.id}/enable")
resp = self.adminr(f"/users/{user.id}/enable", headers={"Content-Type":"application/json"})
self.post_user_request(resp)
L.info(f"Enabled user {user.email} / {user.name} / {user.id}")
return resp

def disable_user(self, email=None, name=None, id=None, user=None):
user = self.get_user(email=email, name=name, id=id, user=user)
resp = self.adminr(f"/users/{user.id}/disable")
resp = self.adminr(f"/users/{user.id}/disable", headers={"Content-Type":"application/json"})
self.post_user_request(resp)
L.info(f"Disabled user {user.email} / {user.name} / {user.id}")
return resp

def delete_user(self, email=None, name=None, id=None, user=None, sync=True, **kw):
user = self.get_user(email=email, name=name, id=id, user=user, sync=sync)
resp = self.adminr(f"/users/{user.id}/delete")
resp = self.adminr(f"/users/{user.id}/delete", headers={"Content-Type":"application/json"})
self.post_user_request(resp)
self.uncache(obj=user, **kw)
L.info(f"Deleted user {user.email} / {user.name} / {user.id}")
Expand Down Expand Up @@ -2977,7 +3016,7 @@ def add_user_to_organization(
collections, orga=orga, token=token
)
params["collections"] = self.compute_accesses(
dcollections, readonly=readonly, hidepasswords=hidepasswords,manage=manage
dcollections, readonly=readonly, hidepasswords=hidepasswords, manage=manage
)["payloads"]
u = f"/api/organizations/{orga.id}/users/invite"
v, i = self.version()
Expand Down Expand Up @@ -3170,7 +3209,7 @@ def set_organization_access(
return payloads

def compute_accesses(
self, dcollections, remove=False, readonly=False, hidepasswords=False,manage=False
self, dcollections, remove=False, readonly=False, hidepasswords=False, manage=False
):
ret = {"payloads": [], "remove": []}
for cid, col in (dcollections or {}).items():
Expand Down Expand Up @@ -3622,9 +3661,9 @@ def get_users_from_group(self, group, orga=None, sync=None, token=None):

def edit_group(self,
group,
orga = None,
users = None,
collections = None,
orga=None,
users=None,
collections=None,
readonly=False,
hidepasswords=False,
manage=False,
Expand Down Expand Up @@ -3652,7 +3691,8 @@ def edit_group(self,
payload["collections"] = self.compute_accesses(
dcollections, readonly=readonly, hidepasswords=hidepasswords, manage=manage
)["payloads"]
resp = self.r(f"/api/organizations/{group.organizationId}/groups/{_id}", json=payload, method="put", token=token)
resp = self.r(f"/api/organizations/{group.organizationId}/groups/{_id}", json=payload, method="put",
token=token)
self.assert_bw_response(resp, expected_status_codes=[200, 500])
return resp

Expand Down
Loading