-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathspectrogram_collector.py
More file actions
92 lines (58 loc) · 3.48 KB
/
spectrogram_collector.py
File metadata and controls
92 lines (58 loc) · 3.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import numpy as np
import matplotlib.pyplot as plt
import pyaudiowpatch as pyaudio
from scipy.signal import spectrogram
import librosa.display
import time
class SpectroCollector:
def __init__(self):
self.RATE = 44100 # Sample rate
self.CHUNK = 4096 # Number of frames per buffer
# Initialize PyAudio
pyaud = pyaudio.PyAudio()
wasapi_info = pyaud.get_host_api_info_by_type(pyaudio.paWASAPI)
default_speakers = pyaud.get_device_info_by_index(wasapi_info["defaultOutputDevice"])
if not default_speakers["isLoopbackDevice"]:
for loopback in pyaud.get_loopback_device_info_generator():
"""
Try to find loopback device with same name(and [Loopback suffix]).
Unfortunately, this is the most adequate way at the moment.
"""
if default_speakers["name"] in loopback["name"]:
default_speakers = loopback
break
else:
exit()
self.RATE = int(default_speakers["defaultSampleRate"])
self.RATE = int(default_speakers["defaultSampleRate"])
print(f"Recording from: ({default_speakers['index']}){default_speakers['name']}")
self.stream = pyaud.open(format=pyaudio.paInt16, channels=default_speakers["maxInputChannels"], rate=int(default_speakers["defaultSampleRate"]), input=True, input_device_index=default_speakers["index"], frames_per_buffer=self.CHUNK)
#RATE = int(default_speakers["defaultSampleRate"])
self.window_buffer_left = np.zeros(int(self.RATE * 1), dtype=np.int16)
self.window_buffer_right = np.zeros(int(self.RATE * 1), dtype=np.int16)
# plt.ion()
# self.fig, (self.ax_left, self.ax_right) = plt.subplots(2, 1, sharex=True, sharey=True)
def get(self):
stereo_data = np.frombuffer(self.stream.read(self.CHUNK), dtype=np.int16)
# TODO
# need to make it so that it outputs empty is there is not much in the buffer or something
# Split stereo data into left and right channels
data_left = stereo_data[::2]
data_right = stereo_data[1::2]
self.window_buffer_left = np.roll(self.window_buffer_left, -self.CHUNK)
self.window_buffer_left[-self.CHUNK:] = data_left
self.window_buffer_right = np.roll(self.window_buffer_right, -self.CHUNK)
self.window_buffer_right[-self.CHUNK:] = data_right
#print(window_buffer_right)
spec_left = librosa.feature.melspectrogram(y=self.window_buffer_left.astype(np.float32), sr=self.RATE)
spec_left_db = librosa.power_to_db(spec_left, ref=np.max)
spec_right = librosa.feature.melspectrogram(y=self.window_buffer_right.astype(np.float32), sr=self.RATE)
spec_right_db = librosa.power_to_db(spec_right, ref=np.max)
spec_left_scaled = (spec_left_db - np.min(spec_left_db)) / (np.max(spec_left_db) - np.min(spec_left_db))
spec_right_scaled = (spec_right_db - np.min(spec_right_db)) / (np.max(spec_right_db) - np.min(spec_right_db))
# librosa.display.specshow(spec_left_scaled, x_axis='time', y_axis='mel', ax=self.ax_left, cmap="viridis")
# librosa.display.specshow(spec_right_scaled, x_axis='time', y_axis='mel', ax=self.ax_right, cmap="viridis")
# self.fig.canvas.draw()
# self.fig.canvas.flush_events()
#print(spec_left_db.shape)
return (spec_left_scaled, spec_right_scaled)