-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathget_pipeline_tools_changes.py
More file actions
121 lines (99 loc) · 4.34 KB
/
get_pipeline_tools_changes.py
File metadata and controls
121 lines (99 loc) · 4.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import json
import datetime
import argparse
import os
from dateutil.parser import parse
from utils.get_prisma_token import get_auth_token
from utils.get_pipeline_tools import get_pipeline_tools
from utils.get_repo import get_repo_scanned
import sqlite3
import time
DATABASE_FILE = 'prisma_pipeline_states.db'
def init_db():
conn = sqlite3.connect(DATABASE_FILE)
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS states
(timestamp INTEGER PRIMARY KEY, state TEXT)''')
conn.commit()
conn.close()
def save_state(state):
conn = sqlite3.connect(DATABASE_FILE)
c = conn.cursor()
c.execute("INSERT INTO states VALUES (?, ?)", (int(time.time()), json.dumps(state)))
conn.commit()
conn.close()
def load_states(days=7):
conn = sqlite3.connect(DATABASE_FILE)
c = conn.cursor()
timestamp = int(time.time()) - (days * 86400)
c.execute("SELECT * FROM states WHERE timestamp >= ? ORDER BY timestamp DESC", (timestamp,))
states = {row[0]: json.loads(row[1]) for row in c.fetchall()}
conn.close()
return states
def cleanup_old_states(days=30):
conn = sqlite3.connect(DATABASE_FILE)
c = conn.cursor()
timestamp = int(time.time()) - (days * 86400)
c.execute("DELETE FROM states WHERE timestamp < ?", (timestamp,))
conn.commit()
conn.close()
def compare_states(previous_state, current_state):
added = [item for item in current_state if item not in previous_state]
removed = [item for item in previous_state if item not in current_state]
modified = [item for item in current_state if item in previous_state and current_state[item] != previous_state[item]]
return added, removed, modified
def match_repos_with_apps(repositories, pipelines):
repo_app_map = {repo['id']: {'name': repo['name'], 'apps': []} for repo in repositories}
for pipeline in pipelines:
cas_id = pipeline.get('casId')
if cas_id in repo_app_map:
repo_app_map[cas_id]['apps'].append(pipeline['appName'])
return repo_app_map
def main():
init_db()
print(f"Database initialized at: {os.path.abspath(DATABASE_FILE)}")
parser = argparse.ArgumentParser(description="List pipeline_tools in Prisma Cloud tenant last scanned before a given date.")
parser.add_argument("--show", action="store_true", help="Show all repositories and their associated appNames")
args = parser.parse_args()
api_url = os.environ.get('PRISMA_API_URL')
username = os.environ.get('PRISMA_ACCESS_KEY')
password = os.environ.get('PRISMA_SECRET_KEY')
if not all([api_url, username, password]):
raise ValueError("One or more required environment variables are not set. Please set PRISMA_API_URL, PRISMA_ACCESS_KEY, and PRISMA_SECRET_KEY.")
auth_token = get_auth_token(api_url, username, password)
pipelines = get_pipeline_tools(api_url, auth_token)
if args.show:
repositories = get_repo_scanned(api_url, auth_token)
repo_app_map = match_repos_with_apps(repositories, pipelines)
print("Repositories and their associated appNames:")
for repo_id, repo_data in repo_app_map.items():
print(f"\nRepository: {repo_data['name']} (ID: {repo_id})")
if repo_data['apps']:
for app in repo_data['apps']:
print(f" - {app}")
else:
print(" No associated appNames found")
else:
if pipelines:
for pipeline in pipelines:
print(pipeline)
else:
print("No pipeline CI files were found or an error occurred.")
current_state = {pipeline['appName']: pipeline for pipeline in pipelines}
save_state(current_state)
previous_states = load_states()
for timestamp, state in previous_states.items():
print(f"\nChanges since {datetime.datetime.fromtimestamp(timestamp)}:")
added, removed, modified = compare_states(state, current_state)
print("Added pipelines:")
for pipeline in added:
print(f" - {pipeline}")
print("\nRemoved pipelines:")
for pipeline in removed:
print(f" - {pipeline}")
print("\nModified pipelines:")
for pipeline in modified:
print(f" - {pipeline}")
cleanup_old_states()
if __name__ == "__main__":
main()