-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstream.py
More file actions
40 lines (25 loc) · 890 Bytes
/
stream.py
File metadata and controls
40 lines (25 loc) · 890 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from typing import Callable, Generator
__all__ = ["stream", "read", "close", "setEvent", "dynamic_stream", "exhaust", "new"]
exit_event: int | bool = False
type stream[OutputType] = Generator[OutputType, None, None]
def read[T](inp: stream[T], amount: int = 1) -> list[T]:
result: list[T] = []
for _ in range(amount):
result.append(next(inp))
return result
def close[T](inp: stream[T]):
for _ in inp:
pass
def setEvent(code: int | bool = True):
global exit_event
exit_event = code
def dynamic_stream[ReturnType](func: Callable):
def wrapper(*args, **kwargs) -> stream[ReturnType]:
while not exit_event:
yield func(*args, **kwargs)
return wrapper()
def exhaust[T](inp: stream[T], amount: int = 1):
for _ in range(amount):
read(inp)
def new[T](output: list[T]) -> stream[T]:
yield from output