多源异构数据采集与融合应用综合实践

发布时间 2023-12-14 22:57:41作者: 拾霜
数据采集与融合技术实践 多源异构数据采集与融合应用综合实践
组名、项目简介 <组名:洛杉矶耐摔王、项目需求和目标:文字和音频总结、项目开展技术路线:python>
团队成员学号 102102127,102102124,102102125,102102123,102102144,102102146,102102126,102102145
这个项目的目标 对爬取或直接提交的文字进行概括总结,将音频文件转录成文本形式的内容,然后对音频内容进行总结和提取关键信息
链接 https://gitee.com/xmxck/datafusion

一 项目整体

1 引言

在当今数据驱动的时代,多源异构数据采集与融合成为了解决复杂问题和获取全面洞察的关键步骤。我们的小组在多源异构数据采集与融合应用方面做了文字和音频总结,本博客将重点介绍我们的项目整体以及各成员在项目中的分工和贡献。

2 项目整体概述

本项目旨在开发一种应用,能够将文字和音频内容转化为简洁、准确的总结。通过结合先进的语音识别和自然语言处理技术,我们的目标是快速获取关键信息,节省时间和精力。项目的核心功能包括音频转录、文本分析和摘要生成。通过语音识别技术,我们将音频内容转录为文字形式,提供可读的文本输出。然后,通过自然语言处理和文本分析算法,我们将对转录的文本进行处理,提取出关键信息、核心要点和摘要,以便更快速地了解和理解内容。

3团队介绍

团队成员

102102127佘培强,102102124杨恺晖,102102125肖辰恺,102102123杨昕,
102102144郑荣城,102102146洪松渝,102102126吴启严,102102145胡嘉鑫

团队分工

文字总结部分:102102124杨恺晖,102102125肖辰恺,102102126吴启严,102102127佘培强
语音总结部分:102102144郑荣城,102102146洪松渝,102102145胡嘉鑫,102102123杨昕

4 项目亮点和挑战

项目亮点:

多模态处理:项目的亮点之一是能够处理文字和音频这两种不同的模态数据。通过将语音转录为文字,并对文字进行分析和摘要生成,我们能够为用户提供更全面、多样化的信息展示和获取方式。
高准确性和流畅性:项目追求高准确性和流畅性的转录和总结生成。我们将使用先进的语音识别模型和自然语言处理算法,以提供准确的转录结果和清晰的总结。这将为用户提供高质量的文字和音频总结,节省时间和提高工作效率。
自适应技术:项目将考虑数据的多样性和变化性,致力于开发自适应技术,以适应不同语音风格、口音和领域专业术语等。这将增强系统的适应性和可扩展性,适用于各种语音内容的处理和总结。

项目挑战:

语音识别准确性:语音识别是项目的核心技术之一,但在实际应用中,语音识别的准确性仍然面临一定的挑战。不同语音风格、口音、语速等因素可能会影响转录的准确性,需要不断优化和改进模型和算法,以提高识别的准确性。

多模态数据融合:将音频转录为文字后,需要将文字和音频的信息融合起来,生成准确、一致的总结。数据融合和一致性的处理是一个挑战,需要设计合适的算法和策略来确保转录和总结之间的一致性和完整性。

大规模数据处理:处理大规模的文字和音频数据需要高效的算法和系统设计。数据的规模和复杂性可能会导致计算和存储的挑战,需要考虑性能优化和资源管理,以保证系统的稳定性和高效性。

多领域应用:项目的应用领域广泛,涵盖会议记录、学习教育、语音资料处理等多个领域。不同领域的语音内容和需求差异较大,需要充分理解和满足不同领域用户的需求,提供定制化的功能和服务。

5 项目成果和应用

二 个人分工部分

1 分工

本次任务中,我负责文字总结的后端代码实现与华为云接口调用

2 技术工作:

1.使用华为云api,调用文字总结nlp大模型,在此基础上使用文本总结接口,对输入的文件进行概括归纳。
2.传输文件具有复杂性,需要先对数据文件进行清洗,保证导入格式的准确性

3 解决问题:

1.调用华为云大模型时端口出现了问题
2.

4 贡献:

app.py
from flask import Flask, request, jsonify, render_template
from huaweicloudsdkcore.auth.credentials import BasicCredentials
from huaweicloudsdknlp.v2.region.nlp_region import NlpRegion
from huaweicloudsdkcore.exceptions import exceptions
from huaweicloudsdknlp.v2 import *
import os
from selenium import webdriver
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.common.by import By
from selenium.webdriver.edge.options import Options
from selenium.webdriver.support import expected_conditions as EC
import time

from selenium.webdriver.support.wait import WebDriverWait

import api

app = Flask(__name__)


# 初始化华为云NLP客户端
def create_nlp_client():
    ak = "ORAEUEXV4RREC1ZFPEPN"
    sk = "nUVOhC9fBe85rtfpiX3fyODtgRAEBlfpfxCqafiE"
    credentials = BasicCredentials(ak, sk)
    client = NlpClient.new_builder() \
        .with_credentials(credentials) \
        .with_region(NlpRegion.value_of("cn-north-4")) \
        .build()
    return client


nlp_client = create_nlp_client()

#首页面
@app.route('/')
def index():
    return render_template('index.html')
#处理文本的页面
@app.route('/txt')
def txt():
    return render_template('text_summarizer.html')

#处理音频的页面
@app.route('/audio', methods=['GET'])
def audioPage():
    return render_template('audio_summarizer.html')

#进行文本处理的代码
@app.route('/summarize', methods=['POST'])
def summarize():
    global content
    url_data = request.form.get('url')
    text_data = request.form.get('text')
    file_data = request.files.get('file')
    summary_length = request.form.get('summary_length', type=int)

    content=""

    if url_data:
        # 使用爬虫获取网页内容
        edge_options = Options()
        edge_options.add_argument('--headless')
        edge_options.add_argument('--disable-gpu')
        driver = webdriver.Edge(options=edge_options)
        driver.get(url_data)
        try:
            WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.TAG_NAME, "h1")))
            content = driver.find_element(By.XPATH, '//*[@class="article"]').text
        except TimeoutException:
            return jsonify({'error': '加载页面元素超时'})
        finally:
            driver.quit()
    elif file_data:
        content = file_data.read().decode('utf-8')
    elif text_data:
        content = text_data

    print(url_data)
    print(content)

    if not content:
        return jsonify({'summary': '没有提供内容。'})


    content = content.replace('\n', '').replace('\r', '').replace('\t', '').replace('\s+', '').replace('\s', '')
    original_length = len(content)
    length_limit = float(summary_length / original_length if original_length > 0 else 0)
    print(content)
    print(length_limit)

    # 使用华为云NLP服务生成总结
    try:
        summary_request = RunSummaryDomainRequest()
        summary_request.body = SummaryDomainReq(
            content=content,
            length_limit=length_limit,  # 或者根据需求调整
            lang="zh"  # 或根据实际情况调整
        )
        response = nlp_client.run_summary_domain(summary_request)
        summary = response.summary
    except exceptions.ClientRequestException as e:
        return jsonify({'error': e.error_msg}), e.status_code

    return jsonify({'summary': summary})

@app.route('/summarize_audio', methods=['POST'])
def summarizeAudioRoute():
    # 获取表单数据
    file = request.files.get('file')

    summary_length = request.form.get('summary_length', type=int) or 10

    if file:
        # 处理上传的文件
        file.save(file.filename)
    else:
        return jsonify({'summary': '没有提供内容。'})

    # 在这里添加你的文本总结逻辑
    content = api.audio2Text('./' + file.filename)
    print(content)
    content = content.replace('\n', '').replace('\r', '').replace('\t', '').replace('\s+', '').replace('\s', '')
    original_length = len(content)

    length_limit = float(summary_length / original_length if original_length > 0 else 0)

    summary = api.summarizeText('', content, length_limit=length_limit)

    return jsonify({'summary': summary})


if __name__ == '__main__':
    app.run(debug=True)

api.py
import json

from huaweicloudsdkcore.auth.credentials import BasicCredentials
from huaweicloudsdknlp.v2.region.nlp_region import NlpRegion
from huaweicloudsdkcore.exceptions import exceptions
# from huaweicloudsdknlp.v2 import *
from huaweicloudsdknlp.v2.model.summary_req import SummaryReq
from huaweicloudsdknlp.v2.model.run_summary_request import RunSummaryRequest
from huaweicloudsdknlp.v2 import NlpClient
from huaweicloud_sis.client.asr_client import AsrCustomizationClient
from huaweicloud_sis.bean.asr_request import AsrCustomLongRequest
from huaweicloud_sis.exception.exceptions import ClientException
from huaweicloud_sis.exception.exceptions import ServerException
from huaweicloud_sis.bean.sis_config import SisConfig
import time
from obs import ObsClient
from obs import PutObjectHeader
import os
import traceback


# user specific information
ak = "ORAEUEXV4RREC1ZFPEPN"  # access key
sk = "nUVOhC9fBe85rtfpiX3fyODtgRAEBlfpfxCqafiE"  # secret key
region = "cn-north-4"
#  server填写Bucket对应的Endpoint, 这里以华北-北京四为例,其他地区请按实际情况填写。
obs_server = "https://obs.cn-north-4.myhuaweicloud.com"
obs_bucket_name = "aeba"


def summarizeText(title, content, length_limit=100, lang='zh'):
    credentials = BasicCredentials(ak, sk) \

    client = NlpClient.new_builder() \
        .with_credentials(credentials) \
        .with_region(NlpRegion.value_of(region)) \
        .build()

    try:
        request = RunSummaryRequest()
        request.body = SummaryReq(
            title=title,
            length_limit=length_limit,
            lang=lang,
            content=content
        )
        response = client.run_summary(request)  # type:ignore
        # print(type(response))
        return response.to_dict().get('summary')
    except exceptions.ClientRequestException as e:
        print(e.status_code)
        print(e.request_id)
        print(e.error_code)
        print(e.error_msg)
        raise e


def uploadFileToOBS(file_path, content_type):
    # 创建obsClient实例
    obsClient = ObsClient(
        access_key_id=ak,
        secret_access_key=sk,
        server=obs_server
    )

    try:
        # 上传对象的附加头域
        headers = PutObjectHeader()

        # 【可选】待上传对象的MIME类型
        headers.contentType = content_type

        # 对象名,即上传后的文件名
        objectKey = f"{int(time.time())}_{os.path.basename(file_path)}"

        # 上传文件的自定义元数据
        # metadata = {'meta1': 'value1', 'meta2': 'value2'}

        # 文件上传
        resp = obsClient.putFile(
            obs_bucket_name, objectKey, file_path, headers
        )

        # 返回码为2xx时,接口调用成功,否则接口调用失败
        if resp.status < 300:  # type:ignore
            print('Put File Succeeded')
            # print('requestId:', resp.requestId)  # type:ignore
            # print('etag:', resp.body.etag)  # type:ignore
            # print('versionId:', resp.body.versionId)  # type:ignore
            # print('storageClass:', resp.body.storageClass)  # type:ignore
            return objectKey
        else:
            print('Put File Failed')
            # print('requestId:', resp.requestId)  # type:ignore
            # print('errorCode:', resp.errorCode)  # type:ignore
            # print('errorMessage:', resp.errorMessage)  # type:ignore
    except Exception as e:
        print('Put File Failed')
        print(traceback.format_exc())
        raise e


def audio2Text(file_path, audio_format='audio/mpeg'):
    global result
    try:
        obj_name = uploadFileToOBS(file_path, audio_format)

        project_id = "65eec9da151845e39b812dbaa59ffa43"
        obs_url = f"https://aeba.obs.cn-north-4.myhuaweicloud.com/{obj_name}"
        obs_audio_format = 'auto'
        # obs_property = 'chinese_8k_general'
        obs_property = 'chinese_8k_common'

        # step1 初始化客户端
        config = SisConfig()

        config.set_connect_timeout(10)       # 设置连接超时
        config.set_read_timeout(10)         # 设置读取超时

        asr_client = AsrCustomizationClient(
            ak, sk, region,
            project_id, sis_config=config
        )

        # step2 构造请求
        asrc_request = AsrCustomLongRequest(
            obs_audio_format, obs_property, obs_url
        )

        # 所有参数均可不设置,使用默认值
        # 设置是否添加标点,yes or no,默认no
        asrc_request.set_add_punc('yes')

        # 设置是否将语音中数字转写为阿拉伯数字,yes or no,默认yes
        asrc_request.set_digit_norm('yes')

        # 设置 是否需要分析信息,True or False, 默认False。
        # 只有need_analysis_info生效,diarization、channel、emotion、speed才会生效
        # 目前仅支持8k模型,详见api文档
        asrc_request.set_need_analysis_info(False)

        # 设置是否需要话者分离,默认True,需要need_analysis_info设置为True才生效。
        asrc_request.set_diarization(False)

        # 设置声道信息, 一般都是单声道,默认为MONO,需要need_analysis_info设置为True才生效
        asrc_request.set_channel('MONO')

        # 设置是否返回感情信息, 默认True,需要need_analysis_info设置为True才生效。
        asrc_request.set_emotion(True)

        # 设置是否需要返回语速信息,默认True,需要need_analysis_info设置为True才生效。
        asrc_request.set_speed(True)

        asrc_request.set_need_word_info('no')

        # step3 发送请求,获取job_id
        job_id = asr_client.submit_job(asrc_request)

        # step4 根据job_id轮询,获取结果。
        status = 'WAITING'
        count = 0   # 每2s查询一次,尝试2000次,即4000s。如果音频很长,可适当考虑加长一些。
        while status != 'FINISHED' and count < 2000:
            print(count, ' query')
            result = asr_client.get_long_response(job_id)  # type:ignore
            status = result.get('status')  # type:ignore
            if status == 'ERROR':
                # print('录音文件识别执行失败, %s' % json.dump(result))
                print(result)  # type:ignore
                break
            time.sleep(2)
            count += 1
        if status != 'FINISHED':
            print('录音文件识别未在 %d 内获取结果,job_id 为%s' % (count, job_id))
        # result为json格式
        # print(type(result))  # type:ignore
        segments = result.get("segments")  # type:ignore
        if len(segments) > 0:
            return segments[0].get("result").get("text")
        print(result)
        print(segments)
        print(json.dumps(result, indent=2, ensure_ascii=False))
    except ClientException as e:
        print(e)
    except ServerException as e:
        print(e)