diff --git a/.github/workflows/install-dependencies-and-run-tests.yml b/.github/workflows/install-dependencies-and-run-tests.yml index a5fd472..27651e2 100644 --- a/.github/workflows/install-dependencies-and-run-tests.yml +++ b/.github/workflows/install-dependencies-and-run-tests.yml @@ -42,6 +42,8 @@ jobs: VERIFY_SSL: ${{ secrets.VERIFY_SSL }} NESTED: ${{ secrets.NESTED }} ENABLE_IPV6: ${{ secrets.ENABLE_IPV6 }} + JWT_TIMBR_URL: ${{ secrets.JWT_TIMBR_URL }} + JWT_TIMBR_ONTOLOGY: ${{ secrets.JWT_TIMBR_ONTOLOGY }} JWT_TENANT_ID: ${{ secrets.JWT_TENANT_ID }} JWT_CLIENT_ID: ${{ secrets.JWT_CLIENT_ID }} JWT_USERNAME: ${{ secrets.JWT_USERNAME }} diff --git a/README.md b/README.md index fc38472..be74c02 100644 --- a/README.md +++ b/README.md @@ -43,18 +43,22 @@ This project is a pure python connector to timbr (no dependencies required). nested = "", verify_ssl = , enable_IPv6 = , + is_jwt = , + jwt_tenant_id = "", + additional_headers = <{ "x-api-impersonate-user": "" }>, ) - # url - Required - String - The IP / Hostname of the Timbr platform. - # ontology - Required - String - The ontology / knowledge graph to connect to. - # token - Required - String - Timbr token value or JWT token value. Note: If you are using JWT token, you need to set the is_jwt parameter to True. - # query - Required - String - The query that you want to execute. - # datasource - Optional - String - Add the specific datasource name that you want to query from, the default value is the current active datasource of your ontology. - # nested - Optional - String - Change to 'true' if nested flag needs to be enabled. make sure this flag contains string value not bool value. - # verify_ssl - Optional - Boolean - Verifying the target server's SSL Certificate, use False to disable this process. - # enable_IPv6 - Optional - Boolean - Change to 'true' if you are using IPv6 connection. - # is_jwt - Optional - Boolean - Set to True if you are using JWT token, otherwise set to False. - # jwt_tenant_id - Optional - String - The tenant ID for JWT authentication + # url - Required - String - The IP / Hostname of the Timbr platform. + # ontology - Required - String - The ontology / knowledge graph to connect to. + # token - Required - String - Timbr token value or JWT token value. Note: If you are using JWT token, you need to set the is_jwt parameter to True. + # query - Required - String - The query that you want to execute. + # datasource - Optional - String - Add the specific datasource name that you want to query from, the default value is the current active datasource of your ontology. + # nested - Optional - String - Change to 'true' if nested flag needs to be enabled. make sure this flag contains string value not bool value. + # verify_ssl - Optional - Boolean - Verifying the target server's SSL Certificate, use False to disable this process. + # enable_IPv6 - Optional - Boolean - Change to 'true' if you are using IPv6 connection. + # is_jwt - Optional - Boolean - Set to True if you are using JWT token, otherwise set to False. + # jwt_tenant_id - Optional - String - The tenant ID for JWT authentication + # additional_headers - Optional - Dict - Extra Timbr connection parameters sent with every request (e.g., 'x-api-impersonate-user'). ``` ### Using Timbr token diff --git a/examples/example.py b/examples/example.py index 5997803..ea9e347 100644 --- a/examples/example.py +++ b/examples/example.py @@ -14,18 +14,22 @@ nested = "", verify_ssl = , enable_IPv6 = , + is_jwt = , + jwt_tenant_id = "", + additional_headers = <{ "x-api-impersonate-user": "" }>, ) - # url - Required - String - The IP / Hostname of the Timbr platform. - # ontology - Required - String - The ontology / knowledge graph to connect to. - # token - Required - String - Timbr token value or JWT token value. Note: If you are using JWT token, you need to set the is_jwt parameter to True. - # query - Required - String - The query that you want to execute. - # datasource - Optional - String - Add the specific datasource name that you want to query from, the default value is the current active datasource of your ontology. - # nested - Optional - String - Change to 'true' if nested flag needs to be enabled. make sure this flag contains string value not bool value. - # verify_ssl - Optional - Boolean - Verifying the target server's SSL Certificate, use False to disable this process. - # enable_IPv6 - Optional - Boolean - Change to 'true' if you are using IPv6 connection. - # is_jwt - Optional - Boolean - Set to True if you are using JWT token, otherwise set to False. - # jwt_tenant_id - Optional - String - The tenant ID for JWT authentication + # url - Required - String - The IP / Hostname of the Timbr platform. + # ontology - Required - String - The ontology / knowledge graph to connect to. + # token - Required - String - Timbr token value or JWT token value. Note: If you are using JWT token, you need to set the is_jwt parameter to True. + # query - Required - String - The query that you want to execute. + # datasource - Optional - String - Add the specific datasource name that you want to query from, the default value is the current active datasource of your ontology. + # nested - Optional - String - Change to 'true' if nested flag needs to be enabled. make sure this flag contains string value not bool value. + # verify_ssl - Optional - Boolean - Verifying the target server's SSL Certificate, use False to disable this process. + # enable_IPv6 - Optional - Boolean - Change to 'true' if you are using IPv6 connection. + # is_jwt - Optional - Boolean - Set to True if you are using JWT token, otherwise set to False. + # jwt_tenant_id - Optional - String - The tenant ID for JWT authentication + # additional_headers - Optional - Dict - Extra Timbr connection parameters sent with every request (e.g., 'x-api-impersonate-user'). # HTTP example response = timbr.run_query( diff --git a/pytimbr_api/timbr_http_connector.py b/pytimbr_api/timbr_http_connector.py index 7956c95..60e3fda 100644 --- a/pytimbr_api/timbr_http_connector.py +++ b/pytimbr_api/timbr_http_connector.py @@ -35,6 +35,7 @@ def run_query( enable_IPv6: bool = False, is_jwt: bool = False, jwt_tenant_id: str = None, + additional_headers: dict = None, ): datasource_addition = '' if datasource: @@ -57,6 +58,10 @@ def run_query( else: headers['x-api-key'] = token + if additional_headers: + for key, value in additional_headers.items(): + headers[key.replace('_', '-')] = value + requests.packages.urllib3.util.connection.HAS_IPV6 = enable_IPv6 response = requests.post( f'{base_url}timbr/openapi/ontology/{ontology}/query{datasource_addition}', diff --git a/test/conftest.py b/test/conftest.py index 65974bb..93e8c23 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -31,6 +31,8 @@ def test_config(): "verify_ssl": convert_env_to_bool(os.getenv("VERIFY_SSL")), "nested": os.getenv("NESTED"), "enableIPv6": convert_env_to_bool(os.getenv("ENABLE_IPV6")), + "jwt_timbr_url": os.getenv("JWT_TIMBR_URL"), + "jwt_timbr_ontology": os.getenv("JWT_TIMBR_ONTOLOGY"), "jwt_tenant_id": os.getenv("JWT_TENANT_ID"), "jwt_client_id": os.getenv("JWT_CLIENT_ID"), "jwt_username": os.getenv("JWT_USERNAME"), diff --git a/test/test_jwt_token.py b/test/test_jwt_token.py index 6b10ae3..2579b7f 100644 --- a/test/test_jwt_token.py +++ b/test/test_jwt_token.py @@ -33,8 +33,8 @@ def test_query_using_jwt(test_config): assert False, f"Error fetching access token: {tokens}" results = run_query( - url=test_config['url'], - ontology=test_config['ontology'], + url=test_config['jwt_timbr_url'], + ontology=test_config['jwt_timbr_ontology'], token=access_token, query='SELECT 1', datasource=test_config['datasource'], diff --git a/test/test_user_impersonation.py b/test/test_user_impersonation.py new file mode 100644 index 0000000..1bbe53f --- /dev/null +++ b/test/test_user_impersonation.py @@ -0,0 +1,166 @@ +import requests +import pytest +import time +from pytimbr_api.timbr_http_connector import run_query +import uuid + +create_granting_user_stmt = "CREATE USER {username} OPTIONS(email='{username}@timbr-test.ai', password='{password}', first_name='{first_name}', last_name='{last_name}');" \ + "GRANT QUERY ON ALL DATASOURCE TO USER {username};" \ + "GRANT ACCESS ON ALL ONTOLOGY TO USER {username};" \ + "GRANT EDIT ON ALL USER TO USER {username};" + +create_impersonating_user_stmt = "CREATE USER {username} OPTIONS(email='{username}@timbr-test.ai', password='{password}', first_name='{first_name}', last_name='{last_name}');" \ + "GRANT ACCESS ON ALL ONTOLOGY TO USER {username};" \ + "GRANT QUERY ON ALL USER TO USER {username};" + +grant_auth_stmt = "GRANT AUTH ON user.{granting} TO USER `{impersonating}`" + +revoke_auth_stmt = "REVOKE AUTH ON user.{granting} FROM USER `{impersonating}`" + +drop_user_stmt = "REVOKE EDIT ON ALL USER FROM USER {username};" \ +"REVOKE ACCESS ON ALL ONTOLOGY FROM USER {username};" \ +"REVOKE QUERY ON ALL DATASOURCE FROM USER {username};" \ +"DROP USER {username};" + +# Generate unique suffix using timestamp and UUID +unique_suffix = str(uuid.uuid4())[:8] +granting_user = f"timbr_python_http_granting_user_{unique_suffix}" +impersonating_user = f"timbr_python_http_impersonating_user_{unique_suffix}" + +def create_users(test_config): + print("Creating users...") + granting_user_stmt = create_granting_user_stmt.format(username=granting_user, password="SecurePassword123", first_name="Granting", last_name="User") + impersonating_user_stmt = create_impersonating_user_stmt.format(username=impersonating_user, password="SecurePassword123", first_name="Impersonating", last_name="User") + + run_query( + url=test_config['url'], + ontology=test_config['ontology'], + token=test_config['token'], + query=granting_user_stmt, + datasource=test_config['datasource'], + nested='false', + verify_ssl=test_config['verify_ssl'], + enable_IPv6=test_config['enableIPv6'], + ) + run_query( + url=test_config['url'], + ontology=test_config['ontology'], + token=test_config['token'], + query=impersonating_user_stmt, + datasource=test_config['datasource'], + nested='false', + verify_ssl=test_config['verify_ssl'], + enable_IPv6=test_config['enableIPv6'], + ) + + grant_auth = grant_auth_stmt.format(granting=granting_user, impersonating=impersonating_user) + run_query( + url=test_config['url'], + ontology=test_config['ontology'], + token=test_config['token'], + query=grant_auth, + datasource=test_config['datasource'], + nested='false', + verify_ssl=test_config['verify_ssl'], + enable_IPv6=test_config['enableIPv6'], + ) + + # Waiting for user creation to propagate + run_query( + url=test_config['url'], + ontology=test_config['ontology'], + token=test_config['token'], + query="refresh permissions", + datasource=test_config['datasource'], + nested='false', + verify_ssl=test_config['verify_ssl'], + enable_IPv6=test_config['enableIPv6'], + ) + + +def drop_users(test_config): + print("\nDropping users...") + revoke_cmd = revoke_auth_stmt.format(granting=granting_user, impersonating=impersonating_user) + run_query( + url=test_config['url'], + ontology=test_config['ontology'], + token=test_config['token'], + query=revoke_cmd, + datasource=test_config['datasource'], + nested='false', + verify_ssl=test_config['verify_ssl'], + enable_IPv6=test_config['enableIPv6'], + ) + + drop_impersonating_user_stmt = drop_user_stmt.format(username=impersonating_user) + run_query( + url=test_config['url'], + ontology=test_config['ontology'], + token=test_config['token'], + query=drop_impersonating_user_stmt, + datasource=test_config['datasource'], + nested='false', + verify_ssl=test_config['verify_ssl'], + enable_IPv6=test_config['enableIPv6'], + ) + + drop_granting_user_stmt = drop_user_stmt.format(username=granting_user) + run_query( + url=test_config['url'], + ontology=test_config['ontology'], + token=test_config['token'], + query=drop_granting_user_stmt, + datasource=test_config['datasource'], + nested='false', + verify_ssl=test_config['verify_ssl'], + enable_IPv6=test_config['enableIPv6'], + ) + +@pytest.fixture(scope="class") +def setup_test_users(test_config): + """Fixture to create users at the start and drop them at the end""" + # Setup: Create users + create_users(test_config) + + yield test_config # This provides the test_config to the test + + # Teardown: Drop users (runs even if tests fail) + try: + drop_users(test_config) + except Exception as e: + print(f"Warning: Failed to drop users during teardown: {e}") + +class TestUserImpersonation: + def test_user_impersonation(self, setup_test_users): + test_config = setup_test_users + + impersonating_user_token_res = run_query( + url=test_config['url'], + ontology=test_config['ontology'], + token=test_config['token'], + query=f"show token for `{impersonating_user}`", + datasource=test_config['datasource'], + nested='false', + verify_ssl=test_config['verify_ssl'], + enable_IPv6=test_config['enableIPv6'], + ) + + impersonating_user_token = impersonating_user_token_res[0]['token'] + + # Act + time.sleep(5) # Make sure the permissions are propagated + res = run_query( + url=test_config['url'], + ontology=test_config['ontology'], + token=impersonating_user_token, + query='select timbr_username()', + datasource=test_config['datasource'], + nested='false', + verify_ssl=test_config['verify_ssl'], + enable_IPv6=test_config['enableIPv6'], + additional_headers={ + "x-api-impersonate-user": granting_user + } + ) + assert list(res[0].keys())[0] == granting_user + assert res[0][granting_user] == granting_user