-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlfs_lock_parser.py
More file actions
150 lines (118 loc) · 4.99 KB
/
lfs_lock_parser.py
File metadata and controls
150 lines (118 loc) · 4.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
""" This module implements a parser for Git LFS lock data and related types. """
import os
import dataclasses
import utility
from worker_thread import WorkerThread
@dataclasses.dataclass
class LfsLockData:
"""
This type implements all data which can be retrieved from Git LFS file locks.
"""
lock_id: int
lock_owner: str
relative_path: str
# True, if file exists locally and was never pushed to a remote branch
is_orphaned: bool
# True, if file exists locally
is_local_file: bool
class LfsLockParser:
"""
This class provides functions to asynchronously parse LFS lock data generated by 'git-lfs
locks'.
"""
lock_data = []
lock_owners = []
subscribers = []
has_parsed_once = False
@staticmethod
def get_lock_owner_of_file(relative_file_path: str):
"""
This helper function retrieves the owner of a given file.
:param relative_file_path: The relative file path of a locked file
:return: The owner of the file
"""
lock_data = LfsLockParser.get_lock_data_of_file(relative_file_path)
if lock_data is not None:
return lock_data.lock_owner
return ""
@staticmethod
def get_lock_data_of_file(relative_file_path: str):
"""
This helper function retrieves the stored lock data of a given file.
:param relative_file_path: The relative file path of a locked file
:return: The lock data of the file
"""
unquoted_file_path = relative_file_path.replace('"', '')
for lock_data in LfsLockParser.lock_data:
if isinstance(lock_data, LfsLockData):
if lock_data.relative_path == unquoted_file_path:
return lock_data
return ""
@staticmethod
def get_lock_owners(lock_data: [LfsLockData]):
"""
This function retrieves all unique LFS lock owners, i.e. all users who own file locks.
:param lock_data: The LFS lock data to use
:return: Returns all unique LFS lock owners
"""
owners = set()
for data in lock_data:
owners.add(data.lock_owner)
return list(owners)
@staticmethod
def subscribe_to_update(subscriber):
"""
This function is used to have an object subscribe to parsing updates of this class.
:param subscriber: The object that wants to be notified
"""
LfsLockParser.subscribers.append(subscriber)
@staticmethod
def _on_parsing_failed(error):
print("LFS lock parser failed: " + error)
@staticmethod
def _notify_subscribers():
# Only notify if we actually parsed any data successfully
if LfsLockParser.has_parsed_once:
print(f"Notifying {len(LfsLockParser.subscribers)} subscribers.")
for subscriber in LfsLockParser.subscribers:
subscriber.on_lock_data_update()
else:
print("Failed to notify %i subscribers because there is no valid data.")
@staticmethod
def parse_locks_async():
"""
Asynchronously parses LFS lock data. Once finished, all subscribers will be notified.
"""
worker = WorkerThread(LfsLockParser._parse_locks)
worker.exec(LfsLockParser._notify_subscribers, LfsLockParser._on_parsing_failed)
@staticmethod
def _parse_locks():
if not utility.is_git_installed() or not utility.is_git_lfs_installed():
print("Failed to parse LFS data. Dependencies are missing.")
return
print("Parsing locks")
# Get the lines of the output as a list
project_root = utility.get_project_root_directory()
command = [utility.get_git_lfs_path(), 'locks']
lines = utility.run_command_and_output_list_of_lines(command, project_root)
data = []
for line in lines:
# Split the data string into components
components = line.split()
# Extract the relevant information
file_path = components[0]
lock_owner = components[1]
lock_id = components[2].split(":")[1] # Extract the number part after "ID:"
# Does the file exists locally?
is_local_file = os.path.isfile(project_root + file_path)
# Is the file an orphaned file, meaning it does not exist anywhere on the remote?
is_orphaned = utility.is_file_orphaned(file_path) and is_local_file
# if is_orphaned:
# print("File '%s' is orphaned." % file_path)
data.append(LfsLockData(lock_id, lock_owner, file_path, is_orphaned, is_local_file))
# Keep a copy of the parsed data
LfsLockParser.lock_data = data
# Cache unique lock owners
LfsLockParser.lock_owners = LfsLockParser.get_lock_owners(data)
LfsLockParser.has_parsed_once = True
print("Finished parsing locks")