数据采集与融合技术第二次实践作业

发布时间 2023-10-19 00:52:33作者: chencanming

第二次作业

码云连接:https://gitee.com/crazypsz/spider/commit/566b31106cde3cd68bd87c63e851b299542e6565

作业一

实验

要求:在中国气象网(http://www.weather.com.cn)给定城市集的 7日天气预报

代码:

import requests
from bs4 import BeautifulSoup
import sqlite3
import logging

logging.basicConfig(level=logging.INFO)

class WeatherDB:
    def __init__(self, db_name):
        self.db_name = db_name

    def __enter__(self):
        self.con = sqlite3.connect(self.db_name)
        self.cursor = self.con.cursor()
        self.create_table()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.con.commit()
        self.con.close()

    def create_table(self):
        self.cursor.execute(
            """
            CREATE TABLE IF NOT EXISTS weathers (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                city VARCHAR(16),
                date VARCHAR(16),
                weather VARCHAR(64),
                temp VARCHAR(32),
                UNIQUE(city, date)
            )
            """
        )

    def insert(self, city, date, weather, temp):
        try:
            self.cursor.execute(
                "INSERT OR IGNORE INTO weathers (city, date, weather, temp) VALUES (?, ?, ?, ?)",
                (city, date, weather, temp)
            )
            logging.info(f"Inserted data for {city} on {date}")
        except Exception as e:
            logging.error(str(e))

class WeatherForecast:
    HEADERS = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
    }

    CITY_CODE = {
        "北京": "101010100",
        "上海": "101020100",
        "广州": "101280101",
        "深圳": "101280601",
        "福州": "101230101"
    }

    def __init__(self):
        pass

    def get_forecast(self, city):
        if city not in self.CITY_CODE:
            logging.error(f"{city} code cannot be found")
            return []

        url = f"http://www.weather.com.cn/weather/{self.CITY_CODE[city]}.shtml"
        response = requests.get(url, headers=self.HEADERS)
        if response.status_code != 200:
            logging.error(f"Failed to get data for {city}")
            return []

        response.encoding = 'utf-8'  # Ensure using UTF-8 encoding
        soup = BeautifulSoup(response.text, 'lxml')
        lis = soup.select("ul[class='t clearfix'] li")

        forecasts = []
        for li in lis:
            try:
                date = li.select_one('h1').text
                weather = li.select_one('p.wea').text
                temp = f"{li.select_one('p.tem span').text}/{li.select_one('p.tem i').text}"
                forecasts.append((city, date, weather, temp))
            except Exception as e:
                logging.error(f"Error processing data for {city}: {str(e)}")
        return forecasts

if __name__ == "__main__":
    with WeatherDB("weathers.db") as db:
        wf = WeatherForecast()
        for city in ["北京", "上海", "广州", "深圳", "福州"]:
            forecasts = wf.get_forecast(city)
            for forecast in forecasts:
                db.insert(*forecast)
                print(f"Inserted data for {city} on {forecast[1]}")

运行结果:

 心得体会:熟悉了mysql数据库的使用方法

作业2

实验

要求:用 requests 和 BeautifulSoup 库方法定向爬取股票相关信息,并

存储在数据库中。

候选网站:东方财富网:https://www.eastmoney.com/

新浪股票:http://finance.sina.com.cn/stock/

代码:

import time
import pandas as pd
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service

class StockScraper:
    def __init__(self, url, num_pages_to_scrape, output_file):
        self.url = url
        self.num_pages_to_scrape = num_pages_to_scrape
        self.output_file = output_file
        self.driver = None

    def setup_driver(self):
        service = Service(executable_path="C:/Users/668/Desktop/chromedriver-win64/chromedriver.exe")
        self.driver = webdriver.Chrome(service=service)

    def close_driver(self):
        if self.driver:
            self.driver.quit()

    def fetch_stock_data(self):
        try:
            print("开始运行脚本...")
            self.setup_driver()

            print("打开网页...")
            self.driver.get(self.url)

            # 等待页面加载
            time.sleep(10)  # 可以根据实际情况调整等待时间

            print("获取表格数据...")

            # 存储所有页的股票数据
            all_data = pd.DataFrame()

            for _ in range(self.num_pages_to_scrape):
                try:
                    # 获取表格数据
                    table = self.driver.find_element(By.XPATH, '/html/body/div[1]/div[2]/div[2]/div[5]/div/table')
                    table_html = table.get_attribute('outerHTML')

                    # 使用 pandas 读取表格数据
                    df = pd.read_html(table_html, header=0, converters={'代码': str})[0]

                    # 删除 "相关链接" 和 "加自选" 列
                    df = df.drop(columns=['相关链接', '加自选'])

                    # 确保股票代码是6位数,填充前导0
                    df['代码'] = df['代码'].apply(lambda x: "'" + str(x).zfill(6))

                    # 添加数据到总的 DataFrame 中
                    all_data = pd.concat([all_data, df], ignore_index=True)

                    # 点击 "下一页" 按钮
                    next_button = self.driver.find_element(By.XPATH, '//a[text()="下一页"]')
                    next_button.click()

                    # 等待页面加载
                    time.sleep(10)  # 可以根据实际情况调整等待时间

                except Exception as e:
                    print("获取数据时出错:", e)
                    break  # 如果出错,停止爬取

            print(all_data)

            # 保存数据到 CSV 文件
            all_data.to_csv(self.output_file, index=False, encoding='utf-8-sig')

            print("数据保存成功。")

        except Exception as e:
            print("发生异常:", e)
        finally:
            self.close_driver()

# 运行函数
scraper = StockScraper(
    url='http://quote.eastmoney.com/center/gridlist.html#sz_a_board',
    num_pages_to_scrape=5,
    output_file='stocks.csv'
)
scraper.fetch_stock_data()

运行结果:

 心得体会:学会了如何寻找数据接口,做的过程粗心漏了反编译

作业3

实验

要求:爬取中国大学 2021 主榜

https://www.shanghairanking.cn/rankings/bcur/2021)所有院校信息,并存储在数据库中,同时将浏览器 F12 调试分析的过程录制 Gif 加入至博客中。

代码:

import time
import pandas as pd
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service

class UniversityRankingScraper:
    def __init__(self, url, output_file):
        self.url = url
        self.output_file = output_file
        self.data_list = []

    def setup_driver(self):
        service = Service(executable_path="D:/Google/chrome/Application/chromedriver-win64/chromedriver.exe")
        return webdriver.Chrome(service=service)

    def scrape_data(self):
        try:
            driver = self.setup_driver()

            # 请求网页
            driver.get(self.url)

            # 无限循环,直到“下一页”按钮不可点击时跳出循环
            while True:
                # 获取所有的行元素
                rows = driver.find_elements(By.XPATH,'//tbody/tr')

                # 循环每行,获取并存储数据
                for row in rows:
                    columns = row.find_elements(By.TAG_NAME,'td')
                    ranking = int(columns[0].text)
                    university = columns[1].text
                    province = columns[2].text
                    type_ = columns[3].text
                    score = float(columns[4].text)
                    self.data_list.append([ranking, university, province, type_, score])

                # 尝试点击“下一页”按钮
                try:
                    next_page_button = driver.find_element(By.XPATH,'/html/body/div/div/div/div[2]/div/div[3]/div[2]/div[1]/div/ul/li[9]')
                    if next_page_button.get_attribute('class') == 'ant-pagination-disabled ant-pagination-next':
                        break
                    next_page_button.click()
                    time.sleep(10)  # 等待页面加载
                except Exception as e:
                    print(e)
                    break

            # 将数据存储到 DataFrame,然后写入 CSV 文件
            df = pd.DataFrame(self.data_list, columns=['排名', '学校', '省市', '类型', '总分'])
            df.to_csv(self.output_file, index=False, encoding='utf-8-sig')

            print(f"数据已保存到 '{self.output_file}' 文件。")

        except Exception as e:
            print("发生异常:", e)
        finally:
            if driver:
                driver.quit()

# 运行函数
scraper = UniversityRankingScraper(
    url='https://www.shanghairanking.cn/rankings/bcur/2021',
    output_file='universities.csv'
)
scraper.scrape_data()

运行结果:

 

 心得体会:与以前的主要差别在于如何实现翻页