Целью моей программы Python является ввод ввода микрофона и преобразовать его в частоты и амплитуду. /> [*] Ввод ввод с микрофона
[*] Конвертировать вход микрофона (который находится в байтах) в значения int
[*] Возьмите преобразованные значения и выполняйте FFT/DFT
Возьмите значения FFT (они содержат частоты и их усилители) и записывают их в файл
ol
ol
ol
ol
ol
ol. /> Понимание программы более подробно:
Основной поток - вход из микрофона и записывает его в queue.queue () (
Код: Выделить всё
mic_input_queue
Код: Выделить всё
byte_to_amplitude_converter_thread
Код: Выделить всё
amplitudes_chunk
Код: Выделить всё
converted_to_frequencies_queue
(извините за то, что я избегал проблем, я старался быть настолько четким, насколько я могу быть).
. очереди (
Код: Выделить всё
mic_input_queue
Код: Выделить всё
put()
Код: Выделить всё
# this is a supporting class in a different file
import threading as ts
class CustomThread(ts.Thread):
"""
Custom implementation of the original python class 'Thread'.
This implementation allows return of value from the function executed
by the thread.
"""
def __init__(self, target=None, args=()):
super().__init__(group=None, target=target, args=args)
self.result = None
self.args = args
def run(self) -> None: self.result = self._target(self.args)
def prerun(self, args): self.args = args
< /code>
# this is another supporting class in a different file
import threading
class ThreadTerminationInformation(threading._RLock):
def __init__(self):
super().__init__()
self.terminate = False
< /code>
# this is the main program------------------------------------------------------------------
import pyaudio as pya
import numpy as np
from time import sleep
import CustomThread
from msvcrt import kbhit
import queue as q
from numba import cuda
import math
from ThreadTerminationInformation import ThreadTerminationInformation
COUNT_OF_NVIDIA_RTX_2060_STREAMING_MULTIPROCESSORS = 30 # Q1
COUNT_OF_NVIDIA_RTX_2060_CUDA_CORES_PER_SM = 64 # Q2
# Q1 * Q2 = 1920. The amount of 'cores' nvidia lists for RTX 2060
THREADS_PER_BLOCK = COUNT_OF_NVIDIA_RTX_2060_CUDA_CORES_PER_SM
FORMAT = pya.paInt16
CHANNELS = 1
RATE = 48000
SAMPLES_WITHIN_A_SECOND_WHILE_PRONOUNCING_AN_ALPHABET = math.ceil(RATE * .2)
BLOCKS_PER_GRID = math.ceil(SAMPLES_WITHIN_A_SECOND_WHILE_PRONOUNCING_AN_ALPHABET/THREADS_PER_BLOCK)
# 48000 samples per second multiplied by .2 (200 milliseconds)
# I take around 200 milliseconds to pronounce and alphabet
CHUNK_SIZE = SAMPLES_WITHIN_A_SECOND_WHILE_PRONOUNCING_AN_ALPHABET # THREADS_PER_BLOCK * COUNT_OF_NVIDIA_RTX_2060_STREAMING_MULTIPROCESSORS
pre_calculated_exponents = np.zeros(shape=(1, SAMPLES_WITHIN_A_SECOND_WHILE_PRONOUNCING_AN_ALPHABET),
dtype=np.float16)
pre_calculated_exponents_on_gpu = ''
converted_to_amplitudes_queue = q.Queue()
converted_to_frequencies_queue = q.Queue()
threads_in_this_program = dict()
threads_in_this_program['byte_to_amplitude_converter_thread'] = CustomThread.CustomThread()
threads_in_this_program['gpu_handler_thread'] = CustomThread.CustomThread()
threads_in_this_program['file_writer_thread'] = CustomThread.CustomThread()
terminate_threads = ThreadTerminationInformation()
terminate_threads.acquire(blocking=True, timeout=-1)
terminate_threads.terminate = False
terminate_threads.release()
@cuda.jit
def initialize_fft_computation_assists(a_pre_calculated_exponents_on_gpu,
a_SAMPLES_WITHIN_A_SECOND_WHILE_PRONOUNCING_AN_ALPHABET):
"""
This function calculates some arguments for cosine and sine mathematical functions.
This calculation is done on the GPU and stored on the GPU.
"""
base_computation = ((cuda.threadIdx.x) + (cuda.blockIdx.x * THREADS_PER_BLOCK))
y_index = base_computation
if y_index >=\
a_SAMPLES_WITHIN_A_SECOND_WHILE_PRONOUNCING_AN_ALPHABET:
a_pre_calculated_exponents_on_gpu[0][y_index] =\
2 * math.pi * (y_index/a_SAMPLES_WITHIN_A_SECOND_WHILE_PRONOUNCING_AN_ALPHABET)
def file_writer(a_converted_to_frequencies_queue):
"""
This function just writes the parameter0 to a file.
The function will be run on a separate thread.
"""
global terminate_threads
try:
audio_frequencies_file = open('audio_frequencies.txt', 'a')
except FileNotFoundError:
audio_frequencies_file = open('audio_frequencies.txt', 'w')
audio_frequencies_file.close()
audio_frequencies_file = open('audio_frequencies.txt', 'r+')
line = ''
while True:
print('fw')
try: first_element = a_converted_to_frequencies_queue.get(block=False)
except: pass
a_converted_to_frequencies_queue.task_done()
if len(first_element) > 0:
audio_frequencies_file.write('\n')
# the tags are: 'audio/ frequencies'
line = ''
for index0 in range(0,first_element.shape[0]):
line = line + str(first_element[index0]) + ','
if line[len(line)-1] == ',': line = line[0:len(line)-1]
audio_frequencies_file.write(line + '\n')
audio_frequencies_file.write('\n')
# the tags are: 'audio/ frequencies'
sleep(0.05)
print('checking fw termination')
if terminate_threads.terminate == True and a_converted_to_frequencies_queue.qsize() > 0: break
print('continuing fw')
sleep(0.05)
audio_frequencies_file.close()
print('exiting file writer')
def byte_to_amplitude_converter(amplitudes_as_bytes):
"""
The PyAudio library only outputs in byte values.
So in python you get something like: b'\xff\x00'.
That value is converted to numbers here
"""
starting_index = 0
amplitudes_chunk = list()
global threads_in_this_program
global converted_to_amplitudes_queue
while True:
#print('b2ac')
try: one_chunk = amplitudes_as_bytes.get(block=False)
except: pass
amplitudes_as_bytes.task_done()
if len(one_chunk) > 0:
starting_index = 0
amplitudes_chunk.clear()
while starting_index 0 and \
not threads_in_this_program['gpu_handler_thread'].is_alive():
print('starting gpu handler thread')
threads_in_this_program['gpu_handler_thread'] = CustomThread.CustomThread(target=gpu_handler_function)
threads_in_this_program['gpu_handler_thread'].prerun(
args=(converted_to_amplitudes_queue)
)
threads_in_this_program['gpu_handler_thread'].start()
#amplitudes_as_bytes.task_done()
sleep(0.05)
#print('checking b2ac termination')
if terminate_threads.terminate == True and amplitudes_as_bytes.qsize() == 0: break
#print('continuing b2ac')
sleep(0.05)
print('exiting byte to amplitude converter')
def gpu_handler_function(a_amplitudes_queue):
"""
This is a function that runs on a separate thread.
Responsible for taking in amplitudes from the amplitude queue
and converting them to frequencies (frequencies and the coefficient for each frequency).
"""
amplitudes_queue = a_amplitudes_queue
global pre_calculated_exponents_on_gpu
global threads_in_this_program
global converted_to_frequencies_queue
global terminate_threads
cuda_streams = [cuda.stream(), cuda.stream(), cuda.stream()]
target_stream = 0
print('running gpu handler')
while True:
print('ghf')
try: current_set = amplitudes_queue.get(block=False)
except: pass
amplitudes_queue.task_done()
if len(current_set) > 0:
print('---------------------------------------------------------')
print(len(current_set))
current_set = np.asarray(a=current_set)
print(current_set.shape)
sample_amplitudes_chunk_array_on_gpu = cuda.to_device(current_set,
stream=cuda_streams[target_stream],
copy=True)
sample_fft_coefficients_chunk_array_on_gpu = cuda.device_array(dtype=np.float32, shape=(2, sample_amplitudes_chunk_array_on_gpu.shape[0]))
# fft computation section starts
print(math.ceil(SAMPLES_WITHIN_A_SECOND_WHILE_PRONOUNCING_AN_ALPHABET/THREADS_PER_BLOCK))
print(THREADS_PER_BLOCK)
#print(math.ceil(SAMPLES_WITHIN_A_SECOND_WHILE_PRONOUNCING_AN_ALPHABET/THREADS_PER_BLOCK) * THREADS_PER_BLOCK)
print(sample_amplitudes_chunk_array_on_gpu.shape)
print(sample_fft_coefficients_chunk_array_on_gpu.shape)
print('---------------------------------------------------------')
fft_computations[BLOCKS_PER_GRID,THREADS_PER_BLOCK
,cuda_streams[target_stream]
](sample_amplitudes_chunk_array_on_gpu,
sample_fft_coefficients_chunk_array_on_gpu,
pre_calculated_exponents_on_gpu,
SAMPLES_WITHIN_A_SECOND_WHILE_PRONOUNCING_AN_ALPHABET)
sample_fft_coefficients_on_cpu = sample_fft_coefficients_chunk_array_on_gpu.copy_to_host(
stream=cuda_streams[target_stream]
)
target_stream = (target_stream + 1) % 3
# fft computation section ends
converted_to_frequencies_queue.put(sample_fft_coefficients_on_cpu)
converted_to_frequencies_queue.task_done()
if converted_to_frequencies_queue.qsize() > 0 and\
not threads_in_this_program['file_writer_thread'].is_alive():
print('starting file writer thread')
threads_in_this_program['file_writer_thread'] = CustomThread.CustomThread(target=file_writer)
threads_in_this_program['file_writer_thread'].prerun((converted_to_frequencies_queue))
threads_in_this_program['file_writer_thread'].start()
#print(amplitudes_queue.unfinished_tasks)
sleep(0.05)
print('checking ghf termination')
if terminate_threads.terminate == True and amplitudes_queue.qsize() == 0: break
print('continuing ghf')
sleep(0.05)
print('exiting gpu handler')
@cuda.jit
def fft_computations(amplitudes, frequencies, pre_calculated_exponents, samples_count):
"""
This is where the actual dft computation happens.
X_k = sum(n=0->n=N| x_n * (euler'sconstant)^(k * n / DFT_SIZE))
DFT_SIZE is SAMPLES_WITHIN_A_SECOND_WHILE_PRONOUNCING_AN_ALPHABET value
where k varies from 0 to N.
"""
y_index = cuda.threadIdx.x + THREADS_PER_BLOCK * cuda.blockIdx.x
if y_index >= samples_count: return
for index0 in range(0, samples_count):
argument = pre_calculated_exponents[0][y_index] * (index0)
frequencies[0][index0] = amplitudes[index0] * math.sqrt((math.cos(argument) ** 2) + (math.sin(argument) ** 2))
frequencies[1][index0] = index0 * (RATE / samples_count)
def convert_audio_to_frequencies():
"""
This is where the core calculation happens.
Takes input amplitude from microphone(s).
Does FFT using numpy.
Dump the data into frequency format for the model.
FORMAT: This is the resolution of data collected values can one of take 2^16 values (from 0 to (2^16)-1)
CHANNELS: In this case how many input channels to take data from. Depends on system hardware
RATE: Sampling rate of data (how many times data collected per second)
CHUNK: group of samples
"""
global pre_calculated_exponents
global pre_calculated_exponents_on_gpu
pre_calculated_exponents_on_gpu = cuda.to_device(pre_calculated_exponents)
initialize_fft_computation_assists[math.ceil((SAMPLES_WITHIN_A_SECOND_WHILE_PRONOUNCING_AN_ALPHABET)/64), 64](pre_calculated_exponents_on_gpu, SAMPLES_WITHIN_A_SECOND_WHILE_PRONOUNCING_AN_ALPHABET)
audio_facilities = pya.PyAudio()
microphone_input = audio_facilities.open(format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK_SIZE)
print('The recording has begun')
mic_input_queue = q.Queue()
while True:
mic_input_queue.put(microphone_input.read(CHUNK_SIZE))
#mic_input_queue.task_done()
if mic_input_queue.qsize() > 0 and\
not threads_in_this_program['byte_to_amplitude_converter_thread'].is_alive():
print('starting byte converter thread')
threads_in_this_program['byte_to_amplitude_converter_thread'] = CustomThread.CustomThread(target=byte_to_amplitude_converter)
threads_in_this_program['byte_to_amplitude_converter_thread'].prerun(mic_input_queue)
threads_in_this_program['byte_to_amplitude_converter_thread'].start()
print('Main thread running')
sleep(0.004)
break
if kbhit(): break
#print('The recording has ended')
microphone_input.stop_stream()
microphone_input.close()
audio_facilities.terminate()
#print('setting termination')
terminate_threads.acquire(blocking=True, timeout=-1)
terminate_threads.terminate = True
terminate_threads.release()
#print('set termination')
print(terminate_threads.terminate)
while True:
print('waiting on other threads to finish: ')
print('bacthread: ' + str(threads_in_this_program['byte_to_amplitude_converter_thread'].is_alive()))
print('ghthread: ' + str(threads_in_this_program['gpu_handler_thread'].is_alive()))
print('fwthread: ' + str(threads_in_this_program['file_writer_thread'].is_alive()))
print('aqtr: ' + str(converted_to_amplitudes_queue.unfinished_tasks))
print('fqtr: ' + str(converted_to_frequencies_queue.unfinished_tasks))
sleep(5)
print('exiting')
if __name__ == '__main__':
convert_audio_to_frequencies()
# end of main program-----------------------------------------------------------------
Ищите '/ * ############################################################################### */
Код: Выделить всё
(newpro) PS C:\Users\vishw\Desktop\newpro> python .\convert_audio_to_frequencies.py
The recording has begun
starting byte converter thread
starting gpu handler thread
Main thread running
running gpu handler
ghf
Exception in thread Thread-5 (gpu_handler_function):
Traceback (most recent call last):
File "C:\Users\vishw\AppData\Local\Programs\Python\Python313\Lib\threading.py", line 1041, in _bootstrap_inner
self.run()
~~~~~~~~^^
File "C:\Users\vishw\Desktop\newpro\CustomThread.py", line 14, in run
def run(self) -> None: self.result = self._target(self.args)
~~~~~~~~~~~~^^^^^^^^^^^
File "C:\Users\vishw\Desktop\newpro\convert_audio_to_frequencies.py", line 156, in gpu_handler_function
amplitudes_queue.task_done()
~~~~~~~~~~~~~~~~~~~~~~~~~~^^
File "C:\Users\vishw\AppData\Local\Programs\Python\Python313\Lib\queue.py", line 93, in task_done
raise ValueError('task_done() called too many times')
ValueError: task_done() called too many times
Exception in thread Thread-4 (byte_to_amplitude_converter):
Traceback (most recent call last):
File "C:\Users\vishw\AppData\Local\Programs\Python\Python313\Lib\threading.py", line 1041, in _bootstrap_inner
self.run()
~~~~~~~~^^
File "C:\Users\vishw\Desktop\newpro\CustomThread.py", line 14, in run
def run(self) -> None: self.result = self._target(self.args)
~~~~~~~~~~~~^^^^^^^^^^^
File "C:\Users\vishw\Desktop\newpro\convert_audio_to_frequencies.py", line 107, in byte_to_amplitude_converter
amplitudes_as_bytes.task_done()
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^
File "C:\Users\vishw\AppData\Local\Programs\Python\Python313\Lib\queue.py", line 93, in task_done
raise ValueError('task_done() called too many times')
ValueError: task_done() called too many times
/* #####################################
The ValueError above is one of the problems.
*/
True
waiting on other threads to finish:
bacthread: False
ghthread: False
fwthread: False
aqtr: 0
fqtr: 0
waiting on other threads to finish:
bacthread: False
ghthread: False
fwthread: False
aqtr: 0
fqtr: 0
waiting on other threads to finish:
bacthread: False
ghthread: False
fwthread: False
aqtr: 0
fqtr: 0
Traceback (most recent call last):
File "C:\Users\vishw\Desktop\newpro\convert_audio_to_frequencies.py", line 272, in
convert_audio_to_frequencies()
~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^
File "C:\Users\vishw\Desktop\newpro\convert_audio_to_frequencies.py", line 268, in convert_audio_to_frequencies
sleep(5)
~~~~~^^^
KeyboardInterrupt
(newpro) PS C:\Users\vishw\Desktop\newpro>
Код: Выделить всё
import numpy as np
import queue as q
import CustomThread
from time import sleep
import threading as ts
class CustomThread(ts.Thread):
"""
Custom implementation of the original python class 'Thread'.
This implementation allows return of value from the function executed
by the thread.
"""
def __init__(self, target=None, args=()):
super().__init__(group=None, target=target, args=args)
self.result = None
self.args = args
def run(self) -> None: self.result = self._target(self.args)
def prerun(self, args): self.args = args
SWAS = 9600 # This is how many samples I would take in FFT in the full code
conv_to_amp_q = q.Queue() # full name: converted_to_amplitudes_queue
conv_to_freq_q = q.Queue() # full name: converted_to_frequencies_queue
threads = dict()
threads['btac'] = CustomThread()
# full name: byte_to_amplitude_converter
threads['ght'] = CustomThread()
# full name: gpu_handler_thread
def btac_func(amp_as_bytes):
# this function converts byte values into 'int' values
# and puts them in another queue. It also checks
# whether a 3rd thread apart from this thread and the main thread
# is running and processing values from the queue
start_ind = 0
amp_chunk = list()
count = 0 # this is to help your code stop execution, else it might run forever
global threads
global conv_to_amp_q
while True:
try: one_chunk = amp_as_bytes.get(block=False)
except: pass
# try block helps avoid errors due to queue not read properly
amp_as_bytes.task_done()
if len(one_chunk) > 0:
# we got some value from the queue? convert those bytes into ints
start_ind = 0
amp_chunk.clear()
while start_ind 0 and \
not threads['ght'].is_alive():
# here we constantly check and run the 3rd thread
threads['ght'] = CustomThread(target=ghf_func)
threads['ght'].prerun(
args=(conv_to_amp_q)
)
threads['ght'].start()
sleep(0.1)
count = count + 1
if count == 5: break
def ghf_func(a_amp_q):
# this function is in lieu of the function handling the gpu work
# in the full code
amp_q = a_amp_q # queue to read from where values are put in by btac
global threads
global conv_to_freq_q
count = 0 # this is to help your code stop execution, else it might run forever
while True:
try: current_set = amp_q.get(block=False)
except: pass
# the above try code block helps us avoid empty queue errors
amp_q.task_done()
if len(current_set) > 0:
# we received a value? do something with it. In the full code
# version. This function is working with the GPU
current_set = np.asarray(a=current_set)
sleep(0.15)
count = count + 1
if count == 5: break
x = ''# please get this from onedrive link in the body below this code section as the value is too large.
# it is just a byte sequence
mic_in_q = q.Queue()
count = 0
while True:
# we try to copy the full code working of repeatedly inserting byte values from microphone input
# and then try to run and constantly check if the thread is running
mic_in_q.put(x)
if mic_in_q.qsize() > 0 and\
not threads['btac'].is_alive():
# we check whether we have received inputs and the second thread is running?
threads['btac'] = CustomThread(target=btac_func)
threads['btac'].prerun(mic_in_q)
threads['btac'].start()
sleep(0.004)
count = count + 1
if count == 5: break
Подробнее здесь: https://stackoverflow.com/questions/795 ... le-threads