Faster-Whisper 实时识别电脑语音转文本
- 前言
- 项目
- 搭建环境
- 安装Faster-Whisper
- 下载模型
- 编写测试代码
- 运行测试代码
- 实时转写脚本
- 实时转写WebSocket服务器模式
- 参考
以前做的智能对话软件接的Baidu API,想换成本地的,就搭一套Faster-Whisper吧。
pip install faster-whisper
# local_files_only=True 表示加载本地模型# model_size_or_path=path 指定加载模型路径# device="cuda" 指定使用cuda# compute_type="int8_float16" 量化为8位# language="zh" 指定音频语言# vad_filter=True 开启vad# vad_parameters=dict(min_silence_duration_ms=1000) 设置vad参数from faster_whisper import WhisperModelmodel_size = "large-v3"path = r"D:/Project/Python_Project/FasterWhisper/large-v3"# Run on GPU with FP16model = WhisperModel(model_size_or_path=path, device="cuda", local_files_only=True) # or run on GPU with INT8# model = WhisperModel(model_size, device="cuda", compute_type="int8_float16")# or run on CPU with INT8# model = WhisperModel(model_size, device="cpu", compute_type="int8")segments, info = model.transcribe("audio.wav", beam_size=5, language="zh", vad_filter=True, vad_parameters=dict(min_silence_duration_ms=1000))print("Detected language '%s' with probability %f" % (info.language, info.language_probability))for segment in segments: print("[%.2fs -> %.2fs] %s" % (segment.start, segment.end, segment.text))
import osimport sysimport timeimport waveimport tempfileimport threadingimport torchimport pyaudiowpatch as pyaudiofrom faster_whisper import WhisperModel as whisper# A bigger audio buffer gives better accuracy# but also increases latency in response.# 表示音频缓冲时间的常量AUDIO_BUFFER = 5# 此函数使用 PyAudio 库录制音频,并将其保存为一个临时的 WAV 文件。# 使用 pyaudio.PyAudio 实例创建一个音频流,通过指定回调函数 callback 来实时写入音频数据到 WAV 文件。# time.sleep(AUDIO_BUFFER) 会阻塞执行,确保录制足够的音频时间。# 最后,函数返回保存的 WAV 文件的文件名。def record_audio(p, device): """Record audio from output device and save to temporary WAV file.""" with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f: filename = wave_file =, "wb") wave_file.setnchannels(device["maxInputChannels"]) wave_file.setsampwidth(pyaudio.get_sample_size(pyaudio.paInt16)) wave_file.setframerate(int(device["defaultSampleRate"])) def callback(in_data, frame_count, time_info, status): """Write frames and return PA flag""" wave_file.writeframes(in_data) return (in_data, pyaudio.paContinue) stream = format=pyaudio.paInt16, channels=device["maxInputChannels"], rate=int(device["defaultSampleRate"]), frames_per_buffer=pyaudio.get_sample_size(pyaudio.paInt16), input=True, input_device_index=device["index"], stream_callback=callback, ) try: time.sleep(AUDIO_BUFFER) # Blocking execution while playing finally: stream.stop_stream() stream.close() wave_file.close() # print(f"{filename} saved.") return filename# 此函数使用 Whisper 模型对录制的音频进行转录,并输出转录结果。def whisper_audio(filename, model): """Transcribe audio buffer and display.""" # segments, info = model.transcribe(filename, beam_size=5, task="translate", language="zh", vad_filter=True, vad_parameters=dict(min_silence_duration_ms=1000)) segments, info = model.transcribe(filename, beam_size=5, language="zh", vad_filter=True, vad_parameters=dict(min_silence_duration_ms=1000)) os.remove(filename) # print(f"{filename} removed.") for segment in segments: # print(f"[{segment.start:.2f} -> {segment.end:.2f}] {segment.text.strip()}") print("[%.2fs -> %.2fs] %s" % (segment.start, segment.end, segment.text))# main 函数是整个脚本的主控制函数。# 加载 Whisper 模型,选择合适的计算设备(GPU 或 CPU)。# 获取默认的 WASAPI 输出设备信息,并选择默认的扬声器(输出设备)。# 使用 PyAudio 开始录制音频,并通过多线程运行 whisper_audio 函数进行音频转录。def main(): """Load model record audio and transcribe from default output device.""" print("Loading model...") device = "cuda" if torch.cuda.is_available() else "cpu" print(f"Using {device} device.") # model = whisper("large-v3", device=device, compute_type="float16") model = whisper("large-v3", device=device, local_files_only=True) print("Model loaded.") with pyaudio.PyAudio() as pya: # Create PyAudio instance via context manager. try: # Get default WASAPI info wasapi_info = pya.get_host_api_info_by_type(pyaudio.paWASAPI) except OSError: print("Looks like WASAPI is not available on the system. Exiting...") sys.exit() # Get default WASAPI speakers default_speakers = pya.get_device_info_by_index( wasapi_info["defaultOutputDevice"] ) if not default_speakers["isLoopbackDevice"]: for loopback in pya.get_loopback_device_info_generator(): # Try to find loopback device with same name(and [Loopback suffix]). # Unfortunately, this is the most adequate way at the moment. if default_speakers["name"] in loopback["name"]: default_speakers = loopback break else: print( """ Default loopback output device not found. Run `python -m pyaudiowpatch` to check available devices. Exiting... """ ) sys.exit() print( f"Recording from: {default_speakers['name']} ({default_speakers['index']})/n" ) while True: filename = record_audio(pya, default_speakers) thread = threading.Thread(target=whisper_audio, args=(filename, model)) thread.start()main()
在最新Google Bard的帮助下,从同步多线程单机版变成了异步WebSocket服务器版本,Unity可以链接并监听实时转写的数据了(写这篇文章时是冬季,ChatGPT实测已经开始休眠状态了)
import asyncioimport osimport waveimport tempfileimport torchimport pyaudiowpatch as pyaudiofrom faster_whisper import WhisperModel as whisperimport websocketsimport json# Audio buffer timeAUDIO_BUFFER = 5# Dictionary to store WebSocket connectionsclients = {}# handle clientasync def handle_client(websocket): client_id = id(websocket) # Using the WebSocket object's ID as a unique identifier print(f"Client connected from {websocket.remote_address} with ID {client_id}") clients[client_id] = websocket try: # print(f"Client connected from {websocket.remote_address}") # Wait for messages from the client async for message in websocket: print(f"Received message from client {client_id}: {message}") # Process the message (you can replace this with your logic) response = f"Server received: {message}" # Send a response back to the client await websocket.send(response) print(f"Sent response to client {client_id}: {response}") except websockets.exceptions.ConnectionClosedError: print(f"Connection with {websocket.remote_address} closed.") finally: # Remove the WebSocket connection when the client disconnects del clients[client_id]# Send a message to all connected clientsasync def send_all_clients(message): if clients==None or clients=={}: print("No clients connected.") return for client_id, websocket in clients.items(): try: await websocket.send(message) print(f"Sent message to client {client_id}: {message}") except Exception as e: print(f"Error sending message to client {client_id}: {e}")# Send a message to a specific client identified by client_idasync def send_message(client_id, message): if client_id in clients: websocket = clients[client_id] await websocket.send(message) print(f"Sent message to client {client_id}: {message}") else: print(f"Client with ID {client_id} not found.")# Start the serverasync def main_server(): server = await websockets.serve(handle_client, "localhost", 8765) print("WebSocket server started. Listening on ws://localhost:8765") await server.wait_closed()#This function records audio using the PyAudio library and saves it as a temporary WAV file.#Use pyaudio PyAudio instance creates an audio stream and writes audio data in real-time to a WAV file by specifying the callback function callback.#Due to the use of the asyncio library, it is no longer necessary to use time. sleep() to block execution, but instead to use asyncio. sleep() to wait asynchronously.#Finally, the function returns the file name of the saved WAV file.async def record_audio(p, device): """Record audio from output device and save to temporary WAV file.""" with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f: filename = wave_file =, "wb") wave_file.setnchannels(device["maxInputChannels"]) wave_file.setsampwidth(pyaudio.get_sample_size(pyaudio.paInt16)) wave_file.setframerate(int(device["defaultSampleRate"])) def callback(in_data, frame_count, time_info, status): """Write frames and return PA flag""" wave_file.writeframes(in_data) return (in_data, pyaudio.paContinue) stream = format=pyaudio.paInt16, channels=device["maxInputChannels"], rate=int(device["defaultSampleRate"]), frames_per_buffer=pyaudio.get_sample_size(pyaudio.paInt16), input=True, input_device_index=device["index"], stream_callback=callback, ) await asyncio.sleep(AUDIO_BUFFER) stream.stop_stream() stream.close() wave_file.close() # print(f"{filename} saved.") return filename # SegmentData classclass SegmentData: def __init__(self, start, end,text): # 实例属性 self.start = start self.end = end self.text = text def __dict__(self): return {"start": self.start, "end": self.end, "text": self.text}def convert_to_unity_data(data): # 参数 data 为字典列表 unity_data = [] for item in data: segment_data = SegmentData(item["start"], item["end"], item["text"]) unity_data.append(segment_data) return unity_data# This function transcribes the recorded audio using the Whisper model and outputs the transcription result.async def whisper_audio(filename, model): """Transcribe audio buffer and display.""" segments, info = model.transcribe(filename, beam_size=5, language="zh", vad_filter=True, vad_parameters=dict(min_silence_duration_ms=1000)) os.remove(filename) # print(f"{filename} removed.") if segments: segments_dict_list = [{"start": segment.start, "end": segment.end, "text": segment.text.strip()} for segment in segments] json_transcriptions=json.dumps(segments_dict_list) print(f"Transcription: {json_transcriptions}") try: await send_all_clients(json_transcriptions) except Exception as e: print(f"Error sending message: {e}")# Start recording audio using PyAudio and concurrently run the whisper_audio function for audio transcription using asyncio.gather.async def main(): """Load model record audio and transcribe from default output device.""" print("Loading model...") device = "cuda" if torch.cuda.is_available() else "cpu" print(f"Using {device} device.") model = whisper("large-v3", device=device, local_files_only=True,compute_type="int8_float16") print("Model loaded.") with pyaudio.PyAudio() as pya: # Get microphone device information (assuming you want to select the first microphone device) microphone_index = 0 microphone_info = pya.get_device_info_by_index(microphone_index) while True: filename = await record_audio(pya, microphone_info) await asyncio.gather(whisper_audio(filename, model))async def appmain(): await asyncio.gather(main(), main_server()) # Gather coroutines hereif __name__ == "__main__": # Pass the main coroutine to
Google Bard
基于faster whisper实现实时语音识别项目语音转文本python编程实现