-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathmain.py
More file actions
257 lines (219 loc) · 8.95 KB
/
main.py
File metadata and controls
257 lines (219 loc) · 8.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import sys
import time
from pathlib import Path
import numpy as np
from src.catcher import capturebluetooth
from src.decoder import decode_flac_file
from src.encoder import BAUD as DEFAULT_BAUD, encode_file
from src.fsk import F0, F1, M as DEFAULT_M
from src.qos import RS_NSYM as DEFAULT_RS_NSYM
EXIT_SUCCESS: int = 0
EXIT_FAILURE: int = 1
EXIT_INTERRUPT: int = 130
def _build_freqs(args: argparse.Namespace) -> list[float]:
m = getattr(args, "m", DEFAULT_M)
f0 = getattr(args, "f0", F0)
f1 = getattr(args, "f1", F1)
if m == 1:
return [f0, f1] # BFSK fallback
return list(np.linspace(f0, f1, m))
def _fmt_size(n: int) -> str:
if n < 1024:
return f"{n} B"
if n < 1024 ** 2:
return f"{n / 1024:.1f} KB"
return f"{n / 1024**2:.2f} MB"
def _fmt_duration(s: float) -> str:
m = int(s) // 60
sec = s - m * 60
return f"{m}m {sec:.1f}s" if m else f"{sec:.1f}s"
def cmd_encode(args: argparse.Namespace) -> int:
print(f"\n[ENCODE] {args.input}")
input_p = Path(args.input)
if not input_p.exists():
print(f"ERROR: file not found: {input_p}", file=sys.stderr)
return EXIT_FAILURE
output_p = Path(args.output) if args.output else input_p.with_suffix('.flac')
print(f" Input : {input_p} ({_fmt_size(input_p.stat().st_size)})")
print(f" Output : {output_p}")
import math
freqs = _build_freqs(args)
bps = int(round(math.log2(len(freqs))))
eff_br = args.baud * bps
print(f" Baud : {args.baud} sym/s | {len(freqs)}-FSK ({bps} bit/sym) "
f"=> {eff_br} bit/s effective")
print(f" Tones : {[round(f) for f in freqs]} Hz")
print(f" RS ECC : {args.rs_nsym} bytes/frame")
print()
t0 = time.perf_counter()
stats = encode_file(
input_path = input_p,
output_path = output_p,
baud = args.baud,
rs_nsym = args.rs_nsym,
freqs = freqs,
)
elapsed = time.perf_counter() - t0
print(f" Frames : {stats['n_frames']}")
print(f" Effective baud : {stats['eff_bitrate']} bit/s")
print(f" Audio duration : {_fmt_duration(stats['audio_duration_s'])}")
print(f" FLAC size : {_fmt_size(stats['flac_size_bytes'])}")
print(f" Encoded in : {elapsed:.2f} s")
print()
if args.play:
dev = args.device if hasattr(args, "device") else None
print(f"Playing on device: {dev or 'system default'}")
return EXIT_SUCCESS
def cmd_decode(args: argparse.Namespace) -> int:
if not args.input and not args.loopback:
print(
"ERROR: specify --input <file.flac> or --loopback --duration <s>",
file=sys.stderr,
)
return EXIT_FAILURE
output_p = Path(args.output) if args.output else None
t0 = time.perf_counter()
if args.input:
# Decode from FLAC file
input_p = Path(args.input)
if not input_p.exists():
print(f"ERROR: file not found: {input_p}", file=sys.stderr)
return EXIT_FAILURE
print(f" Source : {input_p}")
if output_p:
print(f" Output : {output_p}")
else:
print(" Output : (from embedded filename metadata)")
print()
stats = decode_flac_file(
input_path=input_p,
output_path=output_p,
baud=args.baud,
rs_nsym=args.rs_nsym,
freqs=_build_freqs(args),
verbose=True,
)
elapsed = time.perf_counter() - t0
m = stats["metrics"]
print()
print(m.report())
print(
f"Output file: {stats['output_file']} "
f"({_fmt_size(stats['file_size_bytes'])})"
)
print(f"Decoded in: {elapsed:.2f}s")
return EXIT_SUCCESS
def _add_codec_args(parser: argparse.ArgumentParser) -> None:
parser.add_argument(
'--baud', type=int, default=DEFAULT_BAUD,
metavar='N',
help=f'Symbol rate in symbols/s (default: {DEFAULT_BAUD}). '
'Higher = faster but less robust over BT.',
)
parser.add_argument(
'--rs-nsym', type=int, default=DEFAULT_RS_NSYM,
metavar='N', dest='rs_nsym',
help=f'Reed-Solomon ECC bytes per frame (default: {DEFAULT_RS_NSYM}). '
'Must be even; higher = more error correction, more overhead.',
)
parser.add_argument(
'--m', type=int, default=DEFAULT_M, choices=[2, 4, 8],
metavar='M',
help='Number of FSK tones: 2=BFSK (1 bit/sym), 4=4FSK (2 bit/sym, default), '
'8=8FSK (3 bit/sym, least robust).',
)
parser.add_argument(
'--f0', type=float, default=F0,
metavar='HZ',
help=f'Lowest FSK tone frequency, Hz (default: {F0})',
)
parser.add_argument(
'--f1', type=float, default=F1,
metavar='HZ',
help=f'Highest FSK tone frequency, Hz (default: {F1})',
)
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog='main.py',
description='BT-Audio File Codec -- encode any file to a FLAC '
'transmission and decode it back.',
formatter_class=argparse.RawDescriptionHelpFormatter,
)
sub = parser.add_subparsers(dest='command', required=True)
bluetooth = sub.add_parser('bluetooth', help='Capture Bluetooth audio to FLAC file.')
bluetooth.add_argument(
'--filename', '-o', default='output.flac', metavar='FILE',
help='Output FLAC file path (default: output.flac)'
)
enc = sub.add_parser('encode', help='Encode a file to FLAC audio.')
enc.add_argument('input', metavar='FILE', help='Source file to encode.')
enc.add_argument('-o', '--output', metavar='FILE',
help='Output .flac path (default: input file with .flac extension).')
enc.add_argument('--play', action='store_true',
help='Play the generated FLAC immediately after encoding.')
enc.add_argument('--device', metavar='N_OR_NAME',
help='Audio output device index or name (for --play).')
_add_codec_args(enc)
dec = sub.add_parser(
'decode',
help='Decode a FLAC file or loopback capture back to the original file.',
)
src = dec.add_mutually_exclusive_group()
src.add_argument('-i', '--input', metavar='FILE',
help='Input .flac file to decode.')
src.add_argument('--loopback', action='store_true',
help='Capture from system loopback (record what the OS is playing).')
dec.add_argument('--duration', type=float, default=60.0, metavar='SEC',
help='Loopback capture duration in seconds (default: 60). '
'Use a value slightly longer than the audio.')
dec.add_argument('--lb-device', metavar='NAME', dest='lb_device',
help='Partial name of the loopback device (default: default speaker).')
dec.add_argument('-o', '--output', required=False, default=None, metavar='FILE',
help='Output file path for recovered data. '
'If omitted, the original filename embedded in the '
'transmission is used (written to the current directory).')
_add_codec_args(dec)
stm = sub.add_parser(
'stream',
help='Play a FLAC, capture loopback simultaneously, then decode -- '
'all in one command.',
)
stm.add_argument('input', metavar='FILE', help='FLAC file to play and decode.')
stm.add_argument('-o', '--output', required=False, default=None, metavar='FILE',
help='Output file for recovered data. '
'If omitted, the original filename embedded in the '
'transmission is used (written to the current directory).')
stm.add_argument('--device', metavar='N_OR_NAME',
help='Playback device index or name (default: system default).')
stm.add_argument('--lb-device', metavar='NAME', dest='lb_device',
help='Loopback device name (default: default speaker loopback).')
stm.add_argument('--tail', type=float, default=3.0, metavar='SEC',
help='Extra seconds to capture after playback ends (default: 3).')
_add_codec_args(stm)
sub.add_parser('devices', help='List available audio output and loopback devices.')
return parser
def main() -> int:
parser = build_parser()
args = parser.parse_args()
dispatch = {
"encode": cmd_encode,
"decode": cmd_decode,
"bluetooth": capturebluetooth,
}
try:
handler = dispatch.get(args.command)
if handler is None:
parser.print_help()
return EXIT_FAILURE
return handler(args)
except KeyboardInterrupt:
print("\nInterrupted by user.", file=sys.stderr)
return EXIT_INTERRUPT
except Exception as exc:
print(f"\nFATAL ERROR: {exc}", file=sys.stderr)
raise
if __name__ == '__main__':
sys.exit(main())