AI个人学习
和实操指南

Insanely Fast Whisper:快速高效的转录语音为文本的开源项目

综合介绍

insanely-fast-whisper是一个结合了OpenAI的Whisper模型和各种优化技术(如Transformers, Optimum, Flash Attention)的音频转录工具,提供了命令行界面(CLI),旨在快速高效地转录大量音频。它使用Whisper Large v3模型,能够在不到98秒的时间内转录150分钟的音频内容。用户可以通过GitHub仓库了解更多详情、安装指南和使用帮助。

 

多发言人识别


pyannote.audio是一个用Python编写的用于扬声器diarization的开源工具包。基于PyTorch机器学习框架,它具有最先进的预训练模型和管道,可以进一步对自己的数据进行微调,以获得更好的性能。

faster-whisper + pyannote.audio 实现语者识别,实际上只要将二者的识别结果进行结合即可

官方仓库:https://github.com/pyannote/pyannote-audio

 

 

功能列表

使用Whisper Large v3模型进行音频转录
采用Transformers, Optimum, Flash Attention等技术
提供CLI界面
支持不同的优化类型并展示基准测试

 

 

使用帮助

安装: 利用pip进行安装和配置
使用: 直接通过命令行传递参数并运行转录任务
获取帮助: 访问GitHub仓库阅读文档和社区交流

 

 

https://github.com/SYSTRAN/faster-whisper项目编写的google colab代码

# 安装必要的库
get_ipython().system('pip install faster-whisper')
# 导入必要的库
from faster_whisper import available_models
import torch
import ipywidgets as widgets
from IPython.display import display, clear_output
import os  # 导入操作系统库,用于处理文件操作
import gc  # 导入垃圾回收库
# 自动检测设备类型并选择GPU或CPU
device = "cuda" if torch.cuda.is_available() else "cpu"
model_size = "large-v2"  # 默认选择模型大小
compute_type = "float16" if device == "cuda" else "float32"  # 如果使用CPU,则切换到float32
# 获取可用模型的列表
models_list = available_models()
# 默认语言列表
supported_languages = ['en', 'fr', 'de', 'zh', '...']  # 使用默认的语言列表
default_language = 'zh' if 'zh' in supported_languages else supported_languages[0]  # 如果列表中有'zh',则使用它作为默认值;否则使用列表中的第一个值

 

# 创建GUI界面
model_label = widgets.Label('选择模型:')
model_dropdown = widgets.Dropdown(options=models_list, value=model_size)
language_label = widgets.Label('语言:')
language_dropdown = widgets.Dropdown(options=supported_languages, value=default_language)
beam_size_label = widgets.Label('Beam大小:')
beam_size_slider = widgets.IntSlider(value=5, min=1, max=10, step=1)
compute_type_label = widgets.Label('计算类型:')
if device == "cuda":
    compute_type_options = ['float16', 'int8']
else:
    compute_type_options = ['float32']  # 如果是CPU,则锁定为float32
compute_type_dropdown = widgets.Dropdown(options=compute_type_options, value=compute_type)
mode_label = widgets.Label('Format Mode:')
mode_dropdown = widgets.Dropdown(options=['normal', 'timeline', 'subtitle'], value='normal')
initial_prompt_label = widgets.Label('初始提示:')  # 新增的初始提示标签
initial_prompt_text = widgets.Text(value='')  # 新增的初始提示输入框
file_name_text = widgets.Text(description='文件名:', value='/content/')  # 允许用户输入文件名
transcribe_button = widgets.Button(description='转译')
output_area = widgets.Output()

 

# 定义转译函数
def transcribe_audio(b):
    with output_area:
        clear_output()
        print("开始转录...")
        from faster_whisper import WhisperModel  # 动态导入WhisperModel:在需要时导入以节省RAM
        try:
            file_name = file_name_text.value  # 使用用户输入的文件名
            initial_prompt = initial_prompt_text.value  # 使用用户输入的初始提示
            # 确保文件存在
            if not os.path.exists(file_name):
                print(f"文件 {file_name} 不存在,请检查文件名和路径是否正确。")
                return
            # 获取选取的模型
            selected_model = model_dropdown.value
            selected_compute_type = compute_type_dropdown.value
            selected_language = language_dropdown.value
            # 创建新的模型实例并做转译
            model = WhisperModel(selected_model, device=device, compute_type=selected_compute_type)
            try:
                # 转译音频
                segments, info = model.transcribe(file_name, beam_size=beam_size_slider.value, language=selected_language, initial_prompt=initial_prompt)  # 新增的初始提示参数
                # 打印结果
                print("Detected language '%s' with probability %f" % (info.language, info.language_probability))
                for segment in segments:
                    if mode_dropdown.value == 'normal':
                        print("%s " % (segment.text))
                    elif mode_dropdown.value == 'timeline':
                        print("[%.2fs -> %.2fs] %s" % (segment.start, segment.end, segment.text))
                    else:  # subtitle
                        start_time = "{:02d}:{:02d}:{:02d},{:03d}".format(int(segment.start // 3600), int((segment.start % 3600) // 60), int(segment.start % 60), int((segment.start % 1) * 1000))
                        end_time = "{:02d}:{:02d}:{:02d},{:03d}".format(int(segment.end // 3600), int((segment.end % 3600) // 60), int(segment.end % 60), int((segment.end % 1) * 1000))
                        print("%d\n%s --> %s\n%s\n" % (segment.id, start_time, end_time, segment.text))
            finally:
                # 删除模型实例以释放RAM
                del model
        except Exception as e:
            print("An error occurred during transcription:")
            print(str(e))
        finally:
            # 调用垃圾回收
            gc.collect()
        print("转录完成。")

 

# 组装GUI界面
display(model_label, model_dropdown, language_label, language_dropdown, beam_size_label, beam_size_slider, compute_type_label, compute_type_dropdown, mode_label, mode_dropdown, initial_prompt_label, initial_prompt_text, file_name_text, transcribe_button, output_area)
transcribe_button.on_click(transcribe_audio)
识别多发言人代码示例

from pyannote.core import Segment

def get_text_with_timestamp(transcribe_res):
timestamp_texts = []
for item in transcribe_res:
start = item.start
end = item.end
text = item.text.strip()
timestamp_texts.append((Segment(start, end), text))
return timestamp_texts

def add_speaker_info_to_text(timestamp_texts, ann):
spk_text = []
for seg, text in timestamp_texts:
spk = ann.crop(seg).argmax()
spk_text.append((seg, spk, text))
return spk_text

def merge_cache(text_cache):
sentence = ''.join([item[-1] for item in text_cache])
spk = text_cache[0][1]
start = round(text_cache[0][0].start, 1)
end = round(text_cache[-1][0].end, 1)
return Segment(start, end), spk, sentence

PUNC_SENT_END = [',', '.', '?', '!', ",", "。", "?", "!"]

def merge_sentence(spk_text):
merged_spk_text = []
pre_spk = None
text_cache = []
for seg, spk, text in spk_text:
if spk != pre_spk and pre_spk is not None and len(text_cache) > 0:
merged_spk_text.append(merge_cache(text_cache))
text_cache = [(seg, spk, text)]
pre_spk = spk

elif text and len(text) > 0 and text[-1] in PUNC_SENT_END:
text_cache.append((seg, spk, text))
merged_spk_text.append(merge_cache(text_cache))
text_cache = []
pre_spk = spk
else:
text_cache.append((seg, spk, text))
pre_spk = spk
if len(text_cache) > 0:
merged_spk_text.append(merge_cache(text_cache))
return merged_spk_text

def diarize_text(transcribe_res, diarization_result):
timestamp_texts = get_text_with_timestamp(transcribe_res)
spk_text = add_speaker_info_to_text(timestamp_texts, diarization_result)
res_processed = merge_sentence(spk_text)
return res_processed

def write_to_txt(spk_sent, file):
with open(file, 'w') as fp:
for seg, spk, sentence in spk_sent:
line = f'{seg.start:.2f} {seg.end:.2f} {spk} {sentence}\n'
fp.write(line)

 

import torch
import whisper
import numpy as np
from pydub import AudioSegment
from loguru import logger
from faster_whisper import WhisperModel
from pyannote.audio import Pipeline
from pyannote.audio import Audio

from common.error import ErrorCode

model_path = config["asr"]["faster-whisper-large-v3"]

# 测试音频: https://isv-data.oss-cn-hangzhou.aliyuncs.com/ics/MaaS/ASR/test_audio/asr_speaker_demo.wav
audio = "./test/asr/data/asr_speaker_demo.wav"
asr_model = WhisperModel(model_path, device="cuda", compute_type="float16")
spk_rec_pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token="your huggingface token")
spk_rec_pipeline.to(torch.device("cuda"))

asr_result, info = asr_model.transcribe(audio, language="zh", beam_size=5)
diarization_result = spk_rec_pipeline(audio)

final_result = diarize_text(asr_result, diarization_result)
for segment, spk, sent in final_result:
print("[%.2fs -> %.2fs] %s %s" % (segment.start, segment.end, sent, spk))

 

 

相关资源

首席AI分享圈此处内容已经被作者隐藏,请输入验证码查看内容
验证码:
请关注本站微信公众号,回复“验证码”,获取验证码。在微信里搜索“首席AI分享圈”或者“Looks-AI”或者微信扫描右侧二维码都可以关注本站微信公众号。

未经允许不得转载:首席AI分享圈 » Insanely Fast Whisper:快速高效的转录语音为文本的开源项目

首席AI分享圈

首席AI分享圈专注于人工智能学习,提供全面的AI学习内容、AI工具和实操指导。我们的目标是通过高质量的内容和实践经验分享,帮助用户掌握AI技术,一起挖掘AI的无限潜能。无论您是AI初学者还是资深专家,这里都是您获取知识、提升技能、实现创新的理想之地。

联系我们
zh_CN简体中文