forked from gpsaggese/gpsaggese.github.io
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathencrypt.py
More file actions
executable file
·346 lines (297 loc) · 10.1 KB
/
encrypt.py
File metadata and controls
executable file
·346 lines (297 loc) · 10.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
#!/usr/bin/env python
"""
Encrypt or decrypt a directory using Fernet symmetric encryption.
This script processes .md, .txt, and .py files in a directory, encrypting or
decrypting them using the VIM_SECRET environment variable.
Examples:
# Encrypt a directory (creates DIR.secret and removes DIR)
> encrypt.py --input_dir data605/lectures_quizzes
# Encrypt a directory but keep the original
> encrypt.py --input_dir data605/lectures_quizzes --keep_old_dir
# Decrypt a directory (creates DIR from DIR.secret)
> encrypt.py --input_dir data605/lectures_quizzes --decrypt
Import as:
import encrypt as encrypt
"""
import argparse
import base64
import hashlib
import logging
import os
import shlex
import shutil
from pathlib import Path
from cryptography.fernet import Fernet
import helpers.hdbg as hdbg
import helpers.hparser as hparser
import helpers.hsystem as hsystem
_LOG = logging.getLogger(__name__)
# Valid file extensions to process.
_VALID_EXTENSIONS = (".md", ".txt", ".py")
# Encryption header to identify encrypted files.
_ENCRYPTION_HEADER = b"FERNET_ENCRYPTED_V1\n"
# #############################################################################
def _get_secret() -> str:
"""
Get the VIM_SECRET environment variable.
:return: the VIM_SECRET value
"""
secret = os.environ.get("VIM_SECRET")
hdbg.dassert(
secret,
"VIM_SECRET environment variable is not set or is empty",
)
return secret
def _derive_key(password: str) -> bytes:
"""
Derive a Fernet-compatible key from a password.
Uses SHA256 to derive a 32-byte key and base64 encodes it for Fernet.
:param password: the password to derive key from
:return: base64-encoded 32-byte key suitable for Fernet
"""
# Hash the password to get 32 bytes.
key_bytes = hashlib.sha256(password.encode()).digest()
# Base64 encode for Fernet.
return base64.urlsafe_b64encode(key_bytes)
def _should_process_file(file_path: Path) -> bool:
"""
Check if a file should be processed based on its extension.
:param file_path: path to the file
:return: True if file has a valid extension
"""
return file_path.suffix in _VALID_EXTENSIONS
def _is_file_encrypted(file_path: Path) -> bool:
"""
Check if a file is already encrypted by looking for encryption header.
:param file_path: path to the file
:return: True if file starts with encryption header
"""
try:
with open(file_path, "rb") as f:
header = f.read(len(_ENCRYPTION_HEADER))
return header == _ENCRYPTION_HEADER
except Exception as e:
_LOG.warning("Could not read file %s: %s", file_path, e)
return False
def _encrypt_file(file_path: Path, secret: str) -> None:
"""
Encrypt a single file using Fernet encryption.
Skips files that are already encrypted.
:param file_path: path to the file to encrypt
:param secret: encryption key
"""
# Check if file is already encrypted.
if _is_file_encrypted(file_path):
_LOG.debug("Skipping already encrypted file: %s", file_path)
return
_LOG.debug("Encrypting file: %s", file_path)
# Read the file content.
with open(file_path, "rb") as f:
plaintext = f.read()
# Derive encryption key.
key = _derive_key(secret)
cipher = Fernet(key)
# Encrypt the content.
encrypted = cipher.encrypt(plaintext)
# Write encrypted content with header.
with open(file_path, "wb") as f:
f.write(_ENCRYPTION_HEADER)
f.write(encrypted)
def _decrypt_file(file_path: Path, secret: str) -> None:
"""
Decrypt a single file using Fernet decryption.
:param file_path: path to the file to decrypt
:param secret: decryption key
"""
_LOG.debug("Decrypting file: %s", file_path)
# Read the encrypted file.
with open(file_path, "rb") as f:
content = f.read()
# Check and remove header.
hdbg.dassert(
content.startswith(_ENCRYPTION_HEADER),
"File is not encrypted with expected format:",
file_path,
)
encrypted = content[len(_ENCRYPTION_HEADER) :]
# Derive decryption key.
key = _derive_key(secret)
cipher = Fernet(key)
# Decrypt the content.
try:
plaintext = cipher.decrypt(encrypted)
except Exception as e:
hdbg.dfatal(
"Failed to decrypt file (wrong password?):",
file_path,
str(e),
)
# Write decrypted content.
with open(file_path, "wb") as f:
f.write(plaintext)
def _make_readonly(dir_path: Path) -> None:
"""
Make all files in a directory read-only recursively.
:param dir_path: path to the directory
"""
_LOG.debug("Making directory read-only: %s", dir_path)
quoted_path = shlex.quote(str(dir_path))
cmd = f"chmod -R -w {quoted_path}"
hsystem.system(cmd)
def _encrypt_directory(
*,
input_dir: str,
keep_old_dir: bool,
) -> None:
"""
Encrypt a directory by creating a .secret copy with encrypted files.
:param input_dir: source directory to encrypt
:param keep_old_dir: if True, keep the original directory
"""
_LOG.info("Starting encryption of directory: %s", input_dir)
# Get the secret.
secret = _get_secret()
# Set up paths.
src_path = Path(input_dir)
hdbg.dassert(
src_path.exists(),
"Source directory does not exist:",
input_dir,
)
hdbg.dassert(
src_path.is_dir(),
"Source path is not a directory:",
input_dir,
)
dst_path = Path(f"{input_dir}.secret")
# Remove existing destination if it exists.
if dst_path.exists():
_LOG.info("Removing existing destination: %s", dst_path)
# Make files writable before deletion in case they were read-only.
quoted_dst_path = shlex.quote(str(dst_path))
cmd = f"chmod -R +w {quoted_dst_path}"
hsystem.system(cmd)
shutil.rmtree(dst_path)
# Copy the directory.
_LOG.info("Copying directory to: %s", dst_path)
shutil.copytree(src_path, dst_path)
# Make files writable before encryption.
quoted_path = shlex.quote(str(dst_path))
cmd = f"chmod -R +w {quoted_path}"
hsystem.system(cmd)
# Find and encrypt all valid files.
file_count = 0
for file_path in dst_path.rglob("*"):
if file_path.is_file() and _should_process_file(file_path):
_encrypt_file(file_path, secret)
file_count += 1
_LOG.info("Encrypted %d files", file_count)
# Make files read-only.
_make_readonly(dst_path)
# Remove source directory if requested.
if not keep_old_dir:
_LOG.info("Removing source directory: %s", src_path)
# Make files writable before deletion in case they were read-only.
quoted_src_path = shlex.quote(str(src_path))
cmd = f"chmod -R +w {quoted_src_path}"
hsystem.system(cmd)
shutil.rmtree(src_path)
_LOG.info("Encryption complete")
def _decrypt_directory(
*,
input_dir: str,
keep_old_dir: bool,
) -> None:
"""
Decrypt a .secret directory by creating the original directory.
:param input_dir: base name of the directory (without .secret)
:param keep_old_dir: if True, keep the .secret directory
"""
_LOG.info("Starting decryption of directory: %s", input_dir)
# Get the secret.
secret = _get_secret()
# Set up paths.
src_path = Path(f"{input_dir}.secret")
hdbg.dassert(
src_path.exists(),
"Source .secret directory does not exist:",
src_path,
)
hdbg.dassert(
src_path.is_dir(),
"Source path is not a directory:",
src_path,
)
dst_path = Path(input_dir)
# Remove existing destination if it exists.
if dst_path.exists():
_LOG.info("Removing existing destination: %s", dst_path)
# Make files writable before deletion in case they were read-only.
quoted_dst_path = shlex.quote(str(dst_path))
cmd = f"chmod -R +w {quoted_dst_path}"
hsystem.system(cmd)
shutil.rmtree(dst_path)
# Copy the directory.
_LOG.info("Copying directory to: %s", dst_path)
shutil.copytree(src_path, dst_path)
# Make files writable before decryption.
quoted_path = shlex.quote(str(dst_path))
cmd = f"chmod -R +w {quoted_path}"
hsystem.system(cmd)
# Find and decrypt all valid files.
file_count = 0
for file_path in dst_path.rglob("*"):
if file_path.is_file() and _should_process_file(file_path):
_decrypt_file(file_path, secret)
file_count += 1
_LOG.info("Decrypted %d files", file_count)
# Remove source directory if requested.
if not keep_old_dir:
_LOG.info("Removing source .secret directory: %s", src_path)
# Make files writable before deletion.
quoted_src_path = shlex.quote(str(src_path))
cmd = f"chmod -R +w {quoted_src_path}"
hsystem.system(cmd)
shutil.rmtree(src_path)
_LOG.info("Decryption complete")
def _parse() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
"--input_dir",
action="store",
required=True,
help="Directory to encrypt or decrypt (base name without .secret)",
)
parser.add_argument(
"--keep_old_dir",
action="store_true",
default=False,
help="Keep the original directory after encryption/decryption",
)
parser.add_argument(
"--decrypt",
action="store_true",
default=False,
help="Decrypt from .secret directory instead of encrypting",
)
hparser.add_verbosity_arg(parser)
return parser
def _main(parser: argparse.ArgumentParser) -> None:
args = parser.parse_args()
hdbg.init_logger(verbosity=args.log_level, use_exec_path=True)
# Perform encryption or decryption.
if args.decrypt:
_decrypt_directory(
input_dir=args.input_dir,
keep_old_dir=args.keep_old_dir,
)
else:
_encrypt_directory(
input_dir=args.input_dir,
keep_old_dir=args.keep_old_dir,
)
if __name__ == "__main__":
_main(_parse())