发布于: Oct 30, 2022
视频自动翻译生成字幕对于一些想要了解国外最新科技成果,而又苦于英语水平有限的同学的来说,无疑具有巨大的诱惑力,借助 Amazon Transcribe 便可轻松实现这一过程,下面我们就来看看具体如何操作吧。
首先在 Amazon Web Services 管理控制台进入”S3“服务,点击“创建存储桶”, 输入存储桶的名称,点击“创建”按钮创建一个 s3 存储桶。并且在新创建的桶里,点击“创建文件夹“,创建一个名字叫做“videos”的文件夹用于存放我们的视频文件。
注意这里我们新建一个文件夹存放需要加载字幕的视频,而不是直接把视频放到桶里,这样可以避免我们的 Lambda 函数被自己生成的字幕文件循环调用。
每个 Lambda 函数都有一个与之关联的 IAM 角色。此角色定义允许该功能与其进行交互的其他 Amazon Web Services 服务。在本示例中,您需要创建一个 IAM 角色,授予您的 Lambda 函数权限,以便与 Transcribe 服务以及在上一步中创建的 S3 服务进行交互。
在 Amazon Web Services 管理控制台中,单击“服务”,然后选择“IAM”。在左侧导航栏中选择“角色”,然后选择“创建角色”,依次选择“Amazon Web Services产品”,“Lambda”作为角色类型,然后单击“下一步:权限”按钮,在“筛选策略”选择“AmazonS3FullAccess”,“ Amazon Lambda_FullAccess”和“ AmazonTranscribeFullAccess”,点击“下一步:审核”,中角色名称中输入“addSubtitleRole”,点击“创建角色”。
在 Amazon Web Services 管理控制台进入“Lambda”服务,点击“创建函数”按钮。在“函数名称”中填写函数名称,在“运行时”的选择框中选择“Python 2.7”。这里要特别注意,我们选择更改默认执行角色,在“执行角色”中选择“使用现有角色”,然后选择刚刚创建的角色名称,点击“创建函数”按钮完成函数的创建。在此示例中,我们选择了 Python 2.7 作为开发环境,并为该 Lambda 函数赋予了上一步创建的角色。
在 Lambda 的函数配置页面,点击“添加触发器”按钮添加触发条件。选择“s3”。
在触发条件配置页面,在“存储桶”下拉列表中选择刚刚创建的存储桶名称,在“事件类型”下拉列表中选择“Put”,在“前缀”中输入“videos/”(注意这里要有“/”),在“后缀”中输入“.mp4”,然后点击“添加”按钮,完成触发条件的添加。该触发条件设置监视刚刚创建存储桶的 videos 目录中扩展名为 .mp4 的文件,如果新增一个视频,将触发该 lambda 函数。
在刚创建的 Lambda 函数中,我们需要配置了内存的大小和执行超时。由于 Lambda 函数会调用 Transcribe 服务进行文字提取,因此不需要修改内容的大小,默认值为 128MB。示例中我们采用的视频文件的时长均在半小时内,Transcribe 的处理时间通常不会超过 10 分钟,在这里我们设置超时时长“Timeout”为 10 分钟。
打开附件中的 python 文件,将其内容粘贴在 Lambda 函数实现的区域,点击右上角的“Deploy”按钮。
Lambda 的实现主要包括以下几个步骤:
从 event 对象中和系统变量中获取相关参数信息。
- region:当前区域,示例中使用的是区域是宁夏区域
- bucket_name:存储桶名称,您刚刚创建的存储桶名称
- sourceS3Key:视频文件的 key 值。这里是我们刚才所设置的监测 videos 目录下的 mp4 类型的文件,key 值为 videos/***.mp4
- fn:根据 sourceS3Key 提取文件名。
- dir:根据 sourceS3Key 提取目录名。
- 为每个任务 job_name 创建唯一的标识
- 调用 starttranscriptionjob,下面的代码中介绍了每个参数以及含义
- 由于调用的 job 是异步任务,我们通过轮训的方法检测 job 的返回结果
#生成转换任务的时间戳
now=int(time.time())
job_name = "conv-"+str(now)
MediaFileUri:媒体路径,示例采用 s3 的路径
MediaFormat:媒体格式,目前支持 mp3,mp4,wav,flac
LanguageCode:媒体的语言编码,我们的视频是英文的,设置为 en-US
transcribe = boto3.client('transcribe') transcribe.start_transcription_job( TranscriptionJobName=job_name, Media={'MediaFileUri': job_uri}, MediaFormat='mp4', LanguageCode='en-US' )
#转换需要一定的时间,这里进行轮训检测处理结果。您也可以通过控制台查看任务状态。
while True: status = transcribe.get_transcription_job(TranscriptionJobName=job_name) if status['TranscriptionJob']['TranscriptionJobStatus'] in ['COMPLETED', 'FAILED']: break print("Transcribe is processing...") time.sleep(5)
Transcribe 所抓取转化的字幕 json 数组中,包含每个字(或者词语)的开始时间,结束时间,置信度等信息。下面我们需要把 json 数组转换成字幕文件。常见的字幕格式是 SRT 格式。SRT 的格式非常简单:一句时间代码 + 一句字幕。
if (status['TranscriptionJob']['TranscriptionJobStatus']=='COMPLETED'): url=status['TranscriptionJob']['Transcript']['TranscriptFileUri'] text=downloadJson(url) sJson = json.loads(text) output=process(sJson["results"]['items']) uploadResult(region,bucket_name,dir+'/output/'+fn, output) return { 'statusCode': 200, 'body': json.dumps('Process complete') }
函数 process()将上一步的 json 文件转换成 srt 格式的字幕文件,具体处理过程如下。
def process(items): i=1 output='' isStart=False isEnd=False start_time=0 end_time=0 msg='' for index, item in enumerate(items): if (not item.has_key('start_time')): msg=msg+item['alternatives'][0]['content'] + ' ' else: end_time=float(item['end_time']) if (end_time-start_time>4.0 or index+1==len(items)): isEnd=True if (not isStart and item.has_key('start_time')): isStart=True start_time=float(item['start_time']) msg=msg+item['alternatives'][0]['content'] + ' ' output=output+str(i)+'\n' continue if (isStart and not isEnd and item.has_key('start_time')): msg=msg+item['alternatives'][0]['content'] + ' ' if (isStart and isEnd): hour=int(start_time/60/60) min=int(start_time/60)-hour*60 sec=int(start_time)-min*60-hour*60*60 msec=int((start_time-sec)*1000) e_hour=int(end_time/60/60) e_min=int(end_time/60)-e_hour*60 e_sec=int(end_time)-e_min*60-e_hour*60*60 e_msec=int((end_time-sec)*1000) msg1='{}:{}:{},{} --> {}:{}:{},{}'.format(hour,min,sec,msec, e_hour,e_min,e_sec,e_msec)+'\n' output=output+msg1+msg+item['alternatives'][0]['content']+'\n\n' i=i+1 isStart=False isEnd=False start_time=end_time msg='' return output
最后我们将 srt 文件上传到 s3,本示例中,我们设置了 video/output 作为其输出的存储路径。
def uploadResult(region,bucket_name,fn,body): s3 = boto3.client(service_name='s3',region_name=region) s3.put_object(Bucket=bucket_name, Key=fn[0:-4]+'.srt', Body=body) #remove .mp4
在管理控制台点击“S3”服务,打开刚创建的存储桶,进入“videos”目录,点击“上传”“添加文件”从本地电脑里选择一个视频文件,点击“上传”。此时就会触发我们刚刚创建的 Lambda 函数。我们可以在“Amazon Transcribe”观察 job 的执行情况。点击任何一个 job 的名称,可以显示 job 的详细信息。当 job 的状态显示为“完成”,进入到 S3 存储桶的“输出”目录,您会惊喜的发现,字幕文件已经生成了。
下面我们看一下效果吧。看起来效果还是不错的,除了一些技术词汇可能有些偏差,大部分的对话都可以完整的听录下来。如果需要字幕内容需要翻译,这也是一个很好的起点,可以极大的降低翻译人员的工作量。
做了这么多,我们需要花费多少钱呢?Amazon Web Services 官网很贴心的为大家做了一个常用案例的成本分析:
按照上面的假设,Amazon Transcribe 处理一个半小时左右的网络研讨会,大概只需要人民币 4.86 元。
通过使用无服务器架构的 Amazon Transcribe,结合 Amazon Lambda 和 Amazon S3,用户可以方便的在视频处理的各种场景中,无需购买服务器,方便快速的构建自己的视频字幕转换应用。上面的案例是一个我们在实际场景应用的简化版,适合短视频的处理。但是,在实际的运用中,由于部分视频时间较长,比如 keynote 可能长达 3-4 个小时,如果我们采用现在的架构,就会出现 Lambda 超时的问题。对于长视频的处理,我们就需要优化架构。限于篇幅,具体的方法在本博客中不再赘述,但是小伙伴们可以动起来手来,改进现在的架构,比如把我们的 Lambda 应用分成两个部分,现在的部分仅仅用来启动 Transcribe 的任务,不再轮询去等待任务的完成。而采用 CloudWatch event 来监听 Transcribe job 状态的变化,从而触发另一个 lambda 处理生成的 JSON 格式的字幕文件转化成一个 SRT 的格式,这样既可以避免超时的发生,同时也降低了 Lambda 服务轮询所产生的成本。
相关文章