数据采集与融合作业4

发布时间 2023-11-14 16:52:47作者: 拾霜

第四次作业

作业①:

要求:

熟练掌握 Selenium 查找HTML元素、爬取Ajax网页数据、等待HTML元素等内容。
使用Selenium框架+ MySQL数据库存储技术路线爬取“沪深A股”、“上证A股”、“深证A股”3个板块的股票数据信息。
候选网站:东方财富网:http://quote.eastmoney.com/center/gridlist.html#hs_a_board
输出信息:MYSQL数据库存储和输出格式如下,表头应是英文命名例如:序号id,股票代码:bStockNo……,由同学们自行定义设计表头:

代码如下
from selenium.webdriver.support import expected_conditions as EC
import mysql.connector
import pandas as pd
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.common.exceptions import ElementNotInteractableException, StaleElementReferenceException, TimeoutException
from io import StringIO
import time

from selenium.webdriver.support.wait import WebDriverWait

# MySQL连接配置
config = {
    'host': 'localhost',
    'user': 'root',
    'password': 'xck030312',
    'database': 'stock'
}

# 创建或更新数据表
def setup_database(cursor):
    cursor.execute(
        """CREATE TABLE IF NOT EXISTS stock3s (
         id INT AUTO_INCREMENT PRIMARY KEY,
        serial_number INT,
        code VARCHAR(10),a
        name VARCHAR(255),
        latest_price DECIMAL(10, 2),
        change_percentage VARCHAR(10),
        change_amount DECIMAL(10, 2),
        trade_volume VARCHAR(20),
        trade_value VARCHAR(20),
        amplitude VARCHAR(10),
        highest_price DECIMAL(10, 2),
        lowest_price DECIMAL(10, 2),
        opening_price DECIMAL(10, 2),
        closing_price DECIMAL(10, 2),
        volume_ratio DECIMAL(10, 2),
        turnover_rate VARCHAR(10),
        pe_ratio DECIMAL(10, 2),
        pb_ratio DECIMAL(10, 2)
        )"""
    )

# 清理并转换数值
def clean_value(value):
    if isinstance(value, str):
        if value == '-':
            return 0.0
        value = value.replace(',', '')  # Remove commas
        value = value.replace('%', '')  # Remove percentage symbol if present
        if value.endswith('%'):  # If percentage, convert to decimal
            return float(value.rstrip('%')) / 100
        return float(value)
    elif isinstance(value, float):
        return value
    else:
        raise ValueError(f"Unexpected data type: {type(value)}")




# 爬取并存储数据
def scrape_and_store(url, page_limit=None):
    # 启动Edge浏览器
    driver = webdriver.Edge()
    driver.get(url)

    page_scraped = 0

    # 连接到MySQL数据库
    db = mysql.connector.connect(**config)
    cursor = db.cursor()
    setup_database(cursor)

    while True:
        table = driver.find_element(By.XPATH, '/html/body/div[1]/div[2]/div[2]/div[5]/div/table')
        table_html = table.get_attribute('outerHTML')

        # 使用pandas处理表格数据
        df = pd.read_html(StringIO(table_html), header=0)[0]  # Wrap table_html with StringIO to avoid FutureWarning
        page_scraped += 1
        if page_limit is not None and page_scraped >= page_limit:
            break
        # 存储数据到MySQL
        for index, row in df.iterrows():
            values = (
                row['序号'], row['代码'], row['名称'], clean_value(row['最新价']), row['涨跌幅'],
                clean_value(row['涨跌额']), row['成交量(手)'], row['成交额'], row['振幅'], clean_value(row['最高']),
                clean_value(row['最低']), clean_value(row['今开']), clean_value(row['昨收']), clean_value(row['量比']), row['换手率'],
                clean_value(row['市盈率(动态)']), clean_value(row['市净率'])
            )
            cursor.execute(
                """INSERT INTO stock3s (
                    serial_number, code, name, latest_price, change_percentage, change_amount, trade_volume,
                    trade_value, amplitude, highest_price, lowest_price, opening_price, closing_price,
                    volume_ratio, turnover_rate, pe_ratio, pb_ratio
                ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""",
                values
            )

        # 尝试点击“下一页”按钮,如果按钮不可点击则跳出循环
        next_button = driver.find_element(By.CSS_SELECTOR, 'a.next.paginate_button')
        if next_button.get_attribute('disabled') == 'true':
            # The "Next" button is disabled, we are on the last page
            break
        try:
            WebDriverWait(driver, 10).until(
                EC.element_to_be_clickable((By.CSS_SELECTOR, 'a.next.paginate_button'))
            )
            next_button.click()
            time.sleep(2)
        except Exception as e:
            print(f"An error occurred: {e}")
            break

    # 提交事务并关闭连接
    db.commit()
    cursor.close()
    db.close()
    driver.quit()

# 爬取三个板块的数据
urls = [
    'http://quote.eastmoney.com/center/gridlist.html#hs_a_board',
    #'http://quote.eastmoney.com/center/gridlist.html#sh_a_board',
    #'http://quote.eastmoney.com/center/gridlist.html#sz_a_board'
]
for url in urls:
    scrape_and_store(url,page_limit=100)

运行结果如下

image

感想

这段代码使用Selenium和Pandas来爬取股票数据并将其存储到MySQL数据库中。它首先通过Selenium启动Edge浏览器,访问特定的股市数据网页(如东方财富网的股票板块)。在页面上,它读取股票数据表格,然后利用Pandas处理这些表格数据。数据包括股票序号、代码、名称、最新价等多项指标。在提取数据时,对特殊格式(如百分比、带逗号的数字)进行了清洗和转换处理。随后,脚本将数据插入到MySQL数据库的指定表中。它还包含了翻页逻辑,以处理和抓取多个页面的数据。如果遇到“下一页”按钮不可点击或其他异常,脚本将停止爬取。整个过程是自动化的,旨在从网页上收集大量的股票市场数据并将其存储于数据库中,以便于后续分析和使用。

Gitee文件夹链接

链接

作业②:

要求:

熟练掌握 Selenium 查找HTML元素、实现用户模拟登录、爬取Ajax网页数据、等待HTML元素等内容。
使用Selenium框架+MySQL爬取中国mooc网课程资源信息(课程号、课程名称、学校名称、主讲教师、团队成员、参加人数、课程进度、课程简介)
候选网站:中国mooc网:https://www.icourse163.org
输出信息:MYSQL数据库存储和输出格式

代码如下
import pandas as pd
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from scrapy.selector import Selector
import mysql.connector

# 数据库配置
config = {
    'host': 'localhost',
    'user': 'root',
    'password': 'xck030312',
    'database': 'stock'
}

# 创建或更新数据表的函数
def setup_database(cursor):
    # 执行SQL命令创建表,如果表不存在
    cursor.execute(
        """CREATE TABLE IF NOT EXISTS mooc (
        id INT AUTO_INCREMENT PRIMARY KEY,
        course VARCHAR(255),
        school VARCHAR(255),
        teacher VARCHAR(255),
        team VARCHAR(255),
        number VARCHAR(255),
        time VARCHAR(255),
        jianjie TEXT
        )"""
    )

# 主函数,程序的入口点
def main():
    url = 'https://www.icourse163.org/'  # 目标网址
    driver = webdriver.Edge()  # 创建Edge浏览器实例
    driver.get(url)  # 访问目标网址

    login(driver)  # 登录
    search_for_courses(driver, "java")  # 搜索课程
    content = driver.page_source  # 获取网页源码
    print(content)  # 打印网页源码
    driver.quit()  # 关闭浏览器

    data = parse_content(content)  # 解析网页源码,提取数据
    save_to_database(data)  # 将数据保存到数据库

# 登录函数
def login(driver):
    # 等待登录按钮可点击,并点击登录按钮
    WebDriverWait(driver, 10).until(
        EC.element_to_be_clickable((By.XPATH, '//*[@id="app"]/div/div/div[1]/div[3]/div[3]/div'))
    ).click()

    # 切换到登录表单的iframe
    driver.switch_to.default_content()
    driver.switch_to.frame(driver.find_elements(By.TAG_NAME, 'iframe')[0])

    # 输入凭据并登录
    enter_credentials(driver, "13606093312", "xck030312")

    # 切换回主内容
    driver.switch_to.default_content()

# 输入凭据并登录的函数
def enter_credentials(driver, phone_number, password):
    # 等待电话输入框出现,清空并输入电话号码
    phone = WebDriverWait(driver, 10).until(
        EC.presence_of_element_located((By.XPATH, '/html/body/div[2]/div[2]/div[2]/form/div/div[2]/div[2]/input'))
    )
    phone.clear()
    phone.send_keys(phone_number)

    # 清空并输入密码
    passwd = driver.find_element(By.XPATH, '/html/body/div[2]/div[2]/div[2]/form/div/div[4]/div[2]/input[2]')
    passwd.clear()
    passwd.send_keys(password)

    # 点击登录按钮
    submit_btn = driver.find_element(By.XPATH, '//*[@id="submitBtn"]')
    submit_btn.click()

# 搜索课程的函数
def search_for_courses(driver, query):
    # 等待搜索框出现,输入搜索词
    select_course = WebDriverWait(driver, 10).until(
        EC.presence_of_element_located((By.XPATH,
                                        '/html/body/div[4]/div[1]/div/div/div/div/div[7]/div[1]/div/div/div[1]/div/div/div/div/div/div/input'))
    )
    select_course.send_keys(query)

    # 等待搜索按钮可点击,并通过JavaScript执行点击操作
    search_btn = WebDriverWait(driver, 10).until(
        EC.element_to_be_clickable(
            (By.XPATH, '/html/body/div[4]/div[1]/div/div/div/div/div[7]/div[1]/div/div/div[2]/span'))
    )
    driver.execute_script("arguments[0].click();", search_btn)

# 解析网页内容,提取数据的函数
def parse_content(content):
    # 使用Scrapy的Selector解析HTML
    selector = Selector(text=content)
    rows = selector.xpath("//div[@class='m-course-list']/div/div")
    data = []
    for row in rows:
        # 提取课程信息
        lis = []
        course = "".join(row.xpath(".//span[@class=' u-course-name f-thide']//text()").extract())
        school = row.xpath(".//a[@class='t21 f-fc9']/text()").extract_first()
        teacher = row.xpath(".//a[@class='f-fc9']//text()").extract_first()
        team = ",".join(row.xpath(".//a[@class='f-fc9']//text()").extract())
        number = row.xpath(".//span[@class='hot']/text()").extract_first()
        time = row.xpath(".//span[@class='txt']/text()").extract_first()
        jianjie = ",".join(row.xpath(".//span[@class='p5 brief f-ib f-f0 f-cb']//text()").extract())

        # 组装数据
        lis.extend([course, school, teacher, team, number, time, jianjie])
        data.append(lis)
    return data

# 将数据保存到数据库的函数
def save_to_database(data):
    # 创建数据库连接
    connection = mysql.connector.connect(**config)
    cursor = connection.cursor()

    # 设置数据库
    setup_database(cursor)

    # 创建DataFrame对象
    df = pd.DataFrame(data=data, columns=['course', 'school', 'teacher', 'team', 'number', 'time', 'jianjie'])
    print(df)

    # 遍历数据,将数据插入数据库
    for index, row in df.iterrows():
        cursor.execute(
            """INSERT INTO mooc (course, school, teacher, team, number, time, jianjie)
            VALUES (%s, %s, %s, %s, %s, %s, %s)""",
            (row['course'], row['school'], row['teacher'], row['team'], row['number'], row['time'], row['jianjie'])
        )

    # 提交事务
    connection.commit()

    # 关闭连接
    cursor.close()
    connection.close()

# 程序入口
if __name__ == "__main__":
    main()

运行结果如下

image

感想

这段代码用于自动化抓取网课平台上的课程信息并保存到数据库。首先,它使用Selenium的webdriver打开Edge浏览器,自动化完成登录过程,并搜索特定关键词(如"java")的课程。接着,它从网页源码中提取课程信息,如课程名称、学校、教师、团队、人数、时间和简介。提取的数据使用Scrapy的Selector进行解析,并存储在一个列表中。然后,这个脚本创建或更新一个MySQL数据库表,并将提取的数据保存到这个表中。整个过程自动化进行,实现了从网页数据抓取到数据库存储的流程。

Gitee文件夹链接

链接

作业③

要求:
掌握大数据相关服务,熟悉Xshell的使用
完成文档 华为云_大数据实时分析处理实验手册-Flume日志采集实验(部分)v2.docx 中的任务,即为下面5个任务,具体操作见文档。
环境搭建:
任务一:开通MapReduce服务
实时分析开发实战:
进入服务器
image

任务一:Python脚本生成测试数据
1.编写程序并保存
image
2.创建文件夹,并运行代码
image
3.查看数据
image

任务二:配置Kafka
1.创建topic
image

2.查看topic信息
image

任务三: 安装Flume客户端
1-4步
image
5步
image
6步
image
7步
image
8步
image
9步
image
任务四:配置Flume采集数据
image