-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_secretsweeper.py
More file actions
218 lines (181 loc) · 8.16 KB
/
test_secretsweeper.py
File metadata and controls
218 lines (181 loc) · 8.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
import io
import pathlib
import typing
import unittest
import pytest
import secretsweeper
def _generator() -> typing.Iterator[bytes]:
yield b"a"
@pytest.mark.parametrize(
("input", "patterns", "expected"),
[
("first", (), "first"), # no patterns
("second", ("",), "second"), # empty pattern
("teststring", ("string",), "test******"),
("notebook", ("note", "book"), "********"),
("news(paper)man", ("man", "news"), "****(paper)***"),
("aballsong", ("ball", "on"), "a****s**g"),
("son sings a song", ("son",), "*** sings a ***g"),
("[multi\nline]secret", ("multi\nline", "secret"), "[**********]******"),
("new\nline\n", ("line", "new"), "***\n****\n"),
("-dash-\n", ("-",), "*dash*\n"),
("repeatingpeat", ("peat", "peat"), "re****ing****"),
# Overlapping patterns
("asher", ("ash", "her", "she"), "*****"),
("qqwerty", ("qwerty",), "q******"),
("cbcbccb", ("cbccb",), "cb*****"),
(
"bcbcbccb",
(
"cbccb",
"bcbcb",
),
"********",
),
("sinto", ("sin", "into"), "*****"),
(
"smasher",
(
"ash",
"masher",
),
"s******",
),
("friendship", ("end", "ship", "friend"), "**********"),
],
)
def test_mask(input: str, patterns: typing.Iterable[str], expected: str) -> None:
assert secretsweeper.mask(input.encode(), (w.encode() for w in patterns)) == expected.encode()
@pytest.mark.parametrize(
("input", "patterns", "limit", "expected"),
[
("basketball", ("ball",), 2, "basket**"),
("smallhou\nse\n", ("hou\nse",), 2, "small**\n"),
("hellob\nunny", ("b\nunny\n",), 2, "hellob\nunny"),
("thiswasfunny\n", ("funny",), 6, "thiswas*****\n"),
("fivesix\n", ("six\n",), 0, "five"),
("seveneleven\n", ("eleven",), 6, "seven******\n"),
("line\nsecond line\n", ("ne\nsec", "second"), 6, "li****** line\n"), # overlapping patterns + max size
],
)
def test_mask_limit(input: str, patterns: typing.Iterable[str], limit: int, expected: str) -> None:
assert secretsweeper.mask(input.encode(), (w.encode() for w in patterns), limit=limit) == expected.encode()
@pytest.mark.parametrize(
("input", "patterns", "expected"),
[
("", ("",), ""),
("this is a [secret]", ("secret",), "this is a []"),
("fetch fresh fishes", ("sh",), "fetch fre fies"),
],
)
def test_sanitize(input: str, patterns: typing.Iterable[str], expected: str) -> None:
assert secretsweeper.mask(input.encode(), (w.encode() for w in patterns), limit=0) == expected.encode()
@pytest.mark.parametrize(
("input", "patterns", "expected"),
[
# Multibyte characters are replaced with 2-4 asterisks.
("давай", ("да",), "****вай"),
("тримай", ("май", "три"), "************"),
],
)
def test_mask_utf8(input: str, patterns: typing.Iterable[str], expected: str) -> None:
assert secretsweeper.mask(input.encode(), (w.encode() for w in patterns)) == expected.encode()
@pytest.mark.parametrize(
("patterns"),
[
(b"a",), # it can be a tuple
[b"a"], # it can be a list
{b"a"}, # it can be a set
{b"a": typing.Any}, # it can be a dict
(b"a" for i in range(0, 1)), # it can be a generator expression
_generator(),
],
)
def test_mask_pattern_type(patterns: typing.Iterable[bytes]) -> None:
assert secretsweeper.mask(b"a", patterns) == b"*"
def test_mask_max_number_of_stars_default() -> None:
inp = b"a" * (secretsweeper.MAX_NUMBER_OF_STARS + 1)
assert secretsweeper.mask(inp, (inp,)) == b"*" * secretsweeper.MAX_NUMBER_OF_STARS
def test_can_mask_bytearray() -> None:
assert secretsweeper.mask(bytearray(b"funny"), (b"fun",)) == b"***ny"
def test_can_mask_memory_view() -> None:
assert secretsweeper.mask(memoryview(b"funny"), (b"fun",)) == b"***ny"
def test_stream_wrapper_init_and_del() -> None:
wrapper = secretsweeper._core._StreamWrapper((b"a", b"b"))
wrapper2 = secretsweeper._core._StreamWrapper((b"a", b"b"))
assert isinstance(wrapper, secretsweeper._core._StreamWrapper)
assert isinstance(wrapper2, secretsweeper._core._StreamWrapper)
assert id(wrapper) == wrapper._id()
assert id(wrapper2) == wrapper2._id()
assert id(wrapper) != id(wrapper2)
del wrapper
del wrapper2
def test_stream_wrapper_iter() -> None:
chunk = []
with open(pathlib.Path(__file__).parent / "fixtures" / "file.txt", "rb") as f:
stream = secretsweeper.StreamWrapper(f, (b"line",))
for line in stream:
chunk.append(line)
assert b"".join(chunk) == b"first ****\nsecond ****\nthird ****\n"
def test_stream_wrapper_readall() -> None:
with open(pathlib.Path(__file__).parent / "fixtures" / "file.txt", "rb") as f:
stream = secretsweeper.StreamWrapper(f, (b"line",))
result = stream.readall()
assert result == b"first ****\nsecond ****\nthird ****\n"
def test_stream_wrapper_bytes_io() -> None:
s = io.BytesIO(initial_bytes=b"funny")
stream = secretsweeper.StreamWrapper(s, (b"fun",), limit=0)
result = stream.readall()
assert result == b"ny"
def test_stream_wrapper_read_limited_size() -> None:
def _iter() -> typing.Iterator[bytes]:
with open(pathlib.Path(__file__).parent / "fixtures" / "file.txt", "rb") as f:
stream = secretsweeper.StreamWrapper(f, (b"st line\nsecond line\nthird line\n",), limit=5)
# Note: We don't guarantee that the buffer won't exceed the provided size.
while buf := stream.read(size=3):
yield buf
assert b"".join(_iter()) == b"fir*****"
@pytest.mark.parametrize(
("fixture_file", "patterns", "limit", "expected"),
[
("file", (b"line\nthird",), None, b"first line\nsecond ********** line\n"),
# overlapping multiline pattern.
("file", (b"ne\nse", b"second"), 3, b"first li*** line\nthird line\n"),
# the first pattern is near the limit and next overlapping pattern is less than the limit.
("file", (b"ne\nse", b"second"), 4, b"first li**** line\nthird line\n"),
# multiline pattern for more than two lines.
("file", (b"st line\nsecond line\nthird ",), 1, b"fir*line\n"),
# multiline pattern for more than two lines up to the end of the input.
("file", (b"st line\nsecond line\nthird line\n",), 1, b"fir*"),
# removing overlapping patterns.
("file", (b"third", b"ne\n", b"e\n", b" line\n", b"st line\nsecond line\nth"), 0, b"fir"),
# non overlapping - 1 asterisks for every pattern.
("file", (b"first line\n", b"second line\n", b"third line\n"), 1, b"***"),
# overlapping - 1 asterisks in total for all patterns.
("file", (b"first line\ns", b"second line\nt", b"third line\n"), 1, b"*"),
],
)
def test_stream_wrapper(
fixture_file: str, patterns: typing.Iterable[bytes], expected: bytes, limit: None | int
) -> None:
if limit is None:
limit = secretsweeper.MAX_NUMBER_OF_STARS
chunk = []
with open(pathlib.Path(__file__).parent / "fixtures" / f"{fixture_file}.txt", "rb") as f:
stream = secretsweeper.StreamWrapper(f, patterns, limit=limit)
for line in stream:
chunk.append(line)
assert b"".join(chunk) == expected
class InvalidInputTest(unittest.TestCase):
def test_mask_error_input(self) -> None:
with self.assertRaises(TypeError) as ex:
secretsweeper.mask(0, ()) # type: ignore
self.assertIn("expected bytes, memoryview or bytearray, found <class 'int'>", str(ex.exception))
def test_mask_error_patterns(self) -> None:
with self.assertRaises(TypeError) as ex:
secretsweeper.mask(b"", -1) # type: ignore
self.assertIn("'int' object is not iterable", str(ex.exception))
def test_mask_bytes_io_input(self) -> None:
with self.assertRaises(TypeError) as ex:
secretsweeper.mask(io.BytesIO(initial_bytes=b""), ()) # type: ignore
self.assertIn("You can use the StreamWrapper class for such purposes.", str(ex.exception))