Faster-Whisper 实时识别电脑语音转文本

AIGC 0

Faster-Whisper 实时识别电脑语音转文本

  • 前言
  • 项目
    • 搭建环境
    • 安装Faster-Whisper
    • 下载模型
    • 编写测试代码
    • 运行测试代码
    • 实时转写脚本
    • 实时转写WebSocket服务器模式
  • 参考

前言

以前做的智能对话软件接的Baidu API,想换成本地的,就搭一套Faster-Whisper吧。
下面是B站视频实时转写的截图
效果图

项目

搭建环境

所需要的CUDANN已经装好了,如果装的是12.2应该是包含cuBLAS了
没装的,可以从下面链接下载装一下,文末的参考视频中也有讲解
https://github.com/Purfview/whisper-standalone-win/releases/tag/libs

配置好的CUDANN

Ancanda的运行环境去Clone一下之前配好的环境,用之前BertVits的即可

安装Faster-Whisper

输入即可安装

pip install faster-whisper

下载模型

https://huggingface.co/Systran/faster-whisper-large-v3
下载完放到代码旁边就可以了
模型放代码同文件夹

编写测试代码

模型放这里

# local_files_only=True 表示加载本地模型# model_size_or_path=path 指定加载模型路径# device="cuda" 指定使用cuda# compute_type="int8_float16" 量化为8位# language="zh" 指定音频语言# vad_filter=True 开启vad# vad_parameters=dict(min_silence_duration_ms=1000) 设置vad参数from faster_whisper import WhisperModelmodel_size = "large-v3"path = r"D:/Project/Python_Project/FasterWhisper/large-v3"# Run on GPU with FP16model = WhisperModel(model_size_or_path=path, device="cuda", local_files_only=True) # or run on GPU with INT8# model = WhisperModel(model_size, device="cuda", compute_type="int8_float16")# or run on CPU with INT8# model = WhisperModel(model_size, device="cpu", compute_type="int8")segments, info = model.transcribe("audio.wav", beam_size=5, language="zh", vad_filter=True, vad_parameters=dict(min_silence_duration_ms=1000))print("Detected language '%s' with probability %f" % (info.language, info.language_probability))for segment in segments:    print("[%.2fs -> %.2fs] %s" % (segment.start, segment.end, segment.text))

运行测试代码

找个音频放入文件夹内,输入python main.py即可运行!
可以看到正确(不太正确)的识别出了音频说了什么。
运行效果

实时转写脚本

新建一个脚本transper.py
运行即可

此处特别感谢开源项目
https://github.com/MyloBishop/transper

import osimport sysimport timeimport waveimport tempfileimport threadingimport torchimport pyaudiowpatch as pyaudiofrom faster_whisper import WhisperModel as whisper# A bigger audio buffer gives better accuracy# but also increases latency in response.# 表示音频缓冲时间的常量AUDIO_BUFFER = 5# 此函数使用 PyAudio 库录制音频,并将其保存为一个临时的 WAV 文件。# 使用 pyaudio.PyAudio 实例创建一个音频流,通过指定回调函数 callback 来实时写入音频数据到 WAV 文件。# time.sleep(AUDIO_BUFFER) 会阻塞执行,确保录制足够的音频时间。# 最后,函数返回保存的 WAV 文件的文件名。def record_audio(p, device):    """Record audio from output device and save to temporary WAV file."""    with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:        filename = f.name        wave_file = wave.open(filename, "wb")        wave_file.setnchannels(device["maxInputChannels"])        wave_file.setsampwidth(pyaudio.get_sample_size(pyaudio.paInt16))        wave_file.setframerate(int(device["defaultSampleRate"]))        def callback(in_data, frame_count, time_info, status):            """Write frames and return PA flag"""            wave_file.writeframes(in_data)            return (in_data, pyaudio.paContinue)        stream = p.open(            format=pyaudio.paInt16,            channels=device["maxInputChannels"],            rate=int(device["defaultSampleRate"]),            frames_per_buffer=pyaudio.get_sample_size(pyaudio.paInt16),            input=True,            input_device_index=device["index"],            stream_callback=callback,        )        try:            time.sleep(AUDIO_BUFFER)  # Blocking execution while playing        finally:            stream.stop_stream()            stream.close()            wave_file.close()            # print(f"{filename} saved.")    return filename# 此函数使用 Whisper 模型对录制的音频进行转录,并输出转录结果。def whisper_audio(filename, model):    """Transcribe audio buffer and display."""    # segments, info = model.transcribe(filename, beam_size=5, task="translate", language="zh", vad_filter=True, vad_parameters=dict(min_silence_duration_ms=1000))    segments, info = model.transcribe(filename, beam_size=5, language="zh", vad_filter=True, vad_parameters=dict(min_silence_duration_ms=1000))    os.remove(filename)    # print(f"{filename} removed.")    for segment in segments:        # print(f"[{segment.start:.2f} -> {segment.end:.2f}] {segment.text.strip()}")        print("[%.2fs -> %.2fs] %s" % (segment.start, segment.end, segment.text))# main 函数是整个脚本的主控制函数。# 加载 Whisper 模型,选择合适的计算设备(GPU 或 CPU)。# 获取默认的 WASAPI 输出设备信息,并选择默认的扬声器(输出设备)。# 使用 PyAudio 开始录制音频,并通过多线程运行 whisper_audio 函数进行音频转录。def main():    """Load model record audio and transcribe from default output device."""    print("Loading model...")    device = "cuda" if torch.cuda.is_available() else "cpu"    print(f"Using {device} device.")    # model = whisper("large-v3", device=device, compute_type="float16")    model = whisper("large-v3", device=device, local_files_only=True)    print("Model loaded.")    with pyaudio.PyAudio() as pya:        # Create PyAudio instance via context manager.        try:            # Get default WASAPI info            wasapi_info = pya.get_host_api_info_by_type(pyaudio.paWASAPI)        except OSError:            print("Looks like WASAPI is not available on the system. Exiting...")            sys.exit()        # Get default WASAPI speakers        default_speakers = pya.get_device_info_by_index(            wasapi_info["defaultOutputDevice"]        )        if not default_speakers["isLoopbackDevice"]:            for loopback in pya.get_loopback_device_info_generator():                # Try to find loopback device with same name(and [Loopback suffix]).                # Unfortunately, this is the most adequate way at the moment.                if default_speakers["name"] in loopback["name"]:                    default_speakers = loopback                    break            else:                print(                    """                    Default loopback output device not found.                    Run `python -m pyaudiowpatch` to check available devices.                    Exiting...                    """                )                sys.exit()        print(            f"Recording from: {default_speakers['name']} ({default_speakers['index']})/n"        )        while True:            filename = record_audio(pya, default_speakers)            thread = threading.Thread(target=whisper_audio, args=(filename, model))            thread.start()main()

实时转写WebSocket服务器模式

在最新Google Bard的帮助下,从同步多线程单机版变成了异步WebSocket服务器版本,Unity可以链接并监听实时转写的数据了(写这篇文章时是冬季,ChatGPT实测已经开始休眠状态了

import asyncioimport osimport waveimport tempfileimport torchimport pyaudiowpatch as pyaudiofrom faster_whisper import WhisperModel as whisperimport websocketsimport json# Audio buffer timeAUDIO_BUFFER = 5# Dictionary to store WebSocket connectionsclients = {}# handle clientasync def handle_client(websocket):   client_id = id(websocket)  # Using the WebSocket object's ID as a unique identifier   print(f"Client connected from {websocket.remote_address} with ID {client_id}")   clients[client_id] = websocket   try:      # print(f"Client connected from {websocket.remote_address}")      # Wait for messages from the client      async for message in websocket:         print(f"Received message from client {client_id}: {message}")         # Process the message (you can replace this with your logic)         response = f"Server received: {message}"         # Send a response back to the client         await websocket.send(response)         print(f"Sent response to client {client_id}: {response}")   except websockets.exceptions.ConnectionClosedError:      print(f"Connection with {websocket.remote_address} closed.")   finally:      # Remove the WebSocket connection when the client disconnects      del clients[client_id]# Send a message to all connected clientsasync def send_all_clients(message):   if clients==None or clients=={}:       print("No clients connected.")      return   for client_id, websocket in clients.items():      try:         await websocket.send(message)         print(f"Sent message to client {client_id}: {message}")      except Exception as e:         print(f"Error sending message to client {client_id}: {e}")# Send a message to a specific client identified by client_idasync def send_message(client_id, message):   if client_id in clients:      websocket = clients[client_id]      await websocket.send(message)      print(f"Sent message to client {client_id}: {message}")   else:      print(f"Client with ID {client_id} not found.")# Start the serverasync def main_server():   server = await websockets.serve(handle_client, "localhost", 8765)   print("WebSocket server started. Listening on ws://localhost:8765")   await server.wait_closed()#This function records audio using the PyAudio library and saves it as a temporary WAV file.#Use pyaudio PyAudio instance creates an audio stream and writes audio data in real-time to a WAV file by specifying the callback function callback.#Due to the use of the asyncio library, it is no longer necessary to use time. sleep() to block execution, but instead to use asyncio. sleep() to wait asynchronously.#Finally, the function returns the file name of the saved WAV file.async def record_audio(p, device):    """Record audio from output device and save to temporary WAV file."""    with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:        filename = f.name        wave_file = wave.open(filename, "wb")        wave_file.setnchannels(device["maxInputChannels"])        wave_file.setsampwidth(pyaudio.get_sample_size(pyaudio.paInt16))        wave_file.setframerate(int(device["defaultSampleRate"]))        def callback(in_data, frame_count, time_info, status):            """Write frames and return PA flag"""            wave_file.writeframes(in_data)            return (in_data, pyaudio.paContinue)        stream = p.open(            format=pyaudio.paInt16,            channels=device["maxInputChannels"],            rate=int(device["defaultSampleRate"]),            frames_per_buffer=pyaudio.get_sample_size(pyaudio.paInt16),            input=True,            input_device_index=device["index"],            stream_callback=callback,        )        await asyncio.sleep(AUDIO_BUFFER)        stream.stop_stream()        stream.close()        wave_file.close()        # print(f"{filename} saved.")    return filename # SegmentData classclass SegmentData:    def __init__(self, start, end,text):        # 实例属性        self.start = start        self.end = end        self.text = text    def __dict__(self):        return {"start": self.start, "end": self.end, "text": self.text}def convert_to_unity_data(data):  # 参数 data 为字典列表    unity_data = []    for item in data:        segment_data = SegmentData(item["start"], item["end"], item["text"])        unity_data.append(segment_data)    return unity_data# This function transcribes the recorded audio using the Whisper model and outputs the transcription result.async def whisper_audio(filename, model):    """Transcribe audio buffer and display."""    segments, info = model.transcribe(filename, beam_size=5, language="zh", vad_filter=True, vad_parameters=dict(min_silence_duration_ms=1000))    os.remove(filename)    # print(f"{filename} removed.")    if segments:        segments_dict_list = [{"start": segment.start, "end": segment.end, "text": segment.text.strip()} for segment in segments]        json_transcriptions=json.dumps(segments_dict_list)        print(f"Transcription: {json_transcriptions}")        try:            await send_all_clients(json_transcriptions)        except Exception as e:            print(f"Error sending message: {e}")# Start recording audio using PyAudio and concurrently run the whisper_audio function for audio transcription using asyncio.gather.async def main():    """Load model record audio and transcribe from default output device."""    print("Loading model...")    device = "cuda" if torch.cuda.is_available() else "cpu"    print(f"Using {device} device.")    model = whisper("large-v3", device=device, local_files_only=True,compute_type="int8_float16")    print("Model loaded.")    with pyaudio.PyAudio() as pya:        # Get microphone device information (assuming you want to select the first microphone device)        microphone_index = 0        microphone_info = pya.get_device_info_by_index(microphone_index)        while True:            filename = await record_audio(pya, microphone_info)            await asyncio.gather(whisper_audio(filename, model))async def appmain():    await asyncio.gather(main(), main_server())  # Gather coroutines hereif __name__ == "__main__":    asyncio.run(appmain())  # Pass the main coroutine to asyncio.run()

参考

faster-whisper
MyloBishop/transper
Google Bard
基于faster_whisper的实时语音识别
基于faster whisper实现实时语音识别项目语音转文本python编程实现

也许您对下面的内容还感兴趣: