forked from BhanuPraharsha/SecureSync
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_integration.py
More file actions
84 lines (71 loc) · 2.89 KB
/
test_integration.py
File metadata and controls
84 lines (71 loc) · 2.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import requests
import json
from app.crypto import CryptoManager
from app.hmac_derivation import HMACKeyDerivation # ✅ NEW
from pathlib import Path
import time
API_URL = "http://127.0.0.1:8000/api/v1/sync"
SERVER_PUB = "d:/SecureVault/app/keys/server/server_public.pem"
PLANT_PUB = "d:/SecureVault/app/keys/plants/plant_a_public.pem"
PLANT_PRIV = "d:/SecureVault/app/keys/plants/plant_a_private.pem"
def test_sync():
print("Initializing Plant Client...")
# 0. Login
print("Logging in as plant_a...")
try:
auth_res = requests.post(f"{API_URL.replace('/api/v1/sync', '/token')}", data={"username": "plant_a", "password": "plant123"})
if auth_res.status_code != 200:
print(f"FAILED: Login failed {auth_res.text}")
return
token = auth_res.json()["access_token"]
print("Login Success!")
except Exception as e:
print(f"Login Connection Failed: {e}")
return
crypto = CryptoManager(private_key_path=PLANT_PRIV, public_key_path=PLANT_PUB)
payload_data = {
"order_id": "ORD-999",
"production_line": "Line-1",
"status": "COMPLETED",
"quantity": 500,
"meta_data": {"temp": "45C"}
}
# 1. Encrypt, Sign & HMAC
print("Encrypting, Signing and generating HMAC...")
encrypted_payload_dict = crypto.encrypt_payload(payload_data, SERVER_PUB)
signature = crypto.sign_data(payload_data)
# ✅ NEW: Derive HMAC secret from AES key + plant_id + nonce
# Formula: hmac_key = HKDF(aes_key || plant_id || nonce)
hmac_secret = HMACKeyDerivation.derive_hmac_secret(
encrypted_payload_dict['aes_key'], # AES session key
"plant_a", # Plant ID
encrypted_payload_dict['nonce'] # IV/nonce
)
hmac_value = crypto.generate_hmac(payload_data, hmac_secret)
print(f" Using AES-derived HMAC secret: {hmac_secret.hex()[:32]}...")
request_payload = {
"plant_id": "plant_a",
"encrypted_data": encrypted_payload_dict['encrypted_data'],
"encrypted_key": encrypted_payload_dict['encrypted_key'],
"iv": encrypted_payload_dict['iv'],
"tag": encrypted_payload_dict['tag'],
"signature": signature,
"timestamp": str(time.time()),
"hmac": hmac_value, # ✅ HMAC derived from AES key
"priority": "normal"
}
# 2. Send to API
print("Sending to API...")
try:
headers = {"Authorization": f"Bearer {token}"}
response = requests.post(API_URL, json=request_payload, headers=headers)
print(f"Status Code: {response.status_code}")
print(f"Response: {response.json()}")
if response.status_code == 200:
print("SUCCESS: Data synced successfully.")
else:
print("FAILED: API rejected request.")
except Exception as e:
print(f"Connection Failed: {e}")
if __name__ == "__main__":
test_sync()