第二次作业
码云连接:https://gitee.com/crazypsz/spider/commit/566b31106cde3cd68bd87c63e851b299542e6565
作业一
实验
要求:在中国气象网(http://www.weather.com.cn)给定城市集的 7日天气预报
代码:
import requests from bs4 import BeautifulSoup import sqlite3 import logging logging.basicConfig(level=logging.INFO) class WeatherDB: def __init__(self, db_name): self.db_name = db_name def __enter__(self): self.con = sqlite3.connect(self.db_name) self.cursor = self.con.cursor() self.create_table() return self def __exit__(self, exc_type, exc_val, exc_tb): self.con.commit() self.con.close() def create_table(self): self.cursor.execute( """ CREATE TABLE IF NOT EXISTS weathers ( id INTEGER PRIMARY KEY AUTOINCREMENT, city VARCHAR(16), date VARCHAR(16), weather VARCHAR(64), temp VARCHAR(32), UNIQUE(city, date) ) """ ) def insert(self, city, date, weather, temp): try: self.cursor.execute( "INSERT OR IGNORE INTO weathers (city, date, weather, temp) VALUES (?, ?, ?, ?)", (city, date, weather, temp) ) logging.info(f"Inserted data for {city} on {date}") except Exception as e: logging.error(str(e)) class WeatherForecast: HEADERS = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" } CITY_CODE = { "北京": "101010100", "上海": "101020100", "广州": "101280101", "深圳": "101280601", "福州": "101230101" } def __init__(self): pass def get_forecast(self, city): if city not in self.CITY_CODE: logging.error(f"{city} code cannot be found") return [] url = f"http://www.weather.com.cn/weather/{self.CITY_CODE[city]}.shtml" response = requests.get(url, headers=self.HEADERS) if response.status_code != 200: logging.error(f"Failed to get data for {city}") return [] response.encoding = 'utf-8' # Ensure using UTF-8 encoding soup = BeautifulSoup(response.text, 'lxml') lis = soup.select("ul[class='t clearfix'] li") forecasts = [] for li in lis: try: date = li.select_one('h1').text weather = li.select_one('p.wea').text temp = f"{li.select_one('p.tem span').text}/{li.select_one('p.tem i').text}" forecasts.append((city, date, weather, temp)) except Exception as e: logging.error(f"Error processing data for {city}: {str(e)}") return forecasts if __name__ == "__main__": with WeatherDB("weathers.db") as db: wf = WeatherForecast() for city in ["北京", "上海", "广州", "深圳", "福州"]: forecasts = wf.get_forecast(city) for forecast in forecasts: db.insert(*forecast) print(f"Inserted data for {city} on {forecast[1]}")
运行结果:
心得体会:熟悉了mysql数据库的使用方法
作业2
实验
要求:用 requests 和 BeautifulSoup 库方法定向爬取股票相关信息,并
存储在数据库中。
候选网站:东方财富网:https://www.eastmoney.com/
新浪股票:http://finance.sina.com.cn/stock/
代码:
import time import pandas as pd from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.service import Service class StockScraper: def __init__(self, url, num_pages_to_scrape, output_file): self.url = url self.num_pages_to_scrape = num_pages_to_scrape self.output_file = output_file self.driver = None def setup_driver(self): service = Service(executable_path="C:/Users/668/Desktop/chromedriver-win64/chromedriver.exe") self.driver = webdriver.Chrome(service=service) def close_driver(self): if self.driver: self.driver.quit() def fetch_stock_data(self): try: print("开始运行脚本...") self.setup_driver() print("打开网页...") self.driver.get(self.url) # 等待页面加载 time.sleep(10) # 可以根据实际情况调整等待时间 print("获取表格数据...") # 存储所有页的股票数据 all_data = pd.DataFrame() for _ in range(self.num_pages_to_scrape): try: # 获取表格数据 table = self.driver.find_element(By.XPATH, '/html/body/div[1]/div[2]/div[2]/div[5]/div/table') table_html = table.get_attribute('outerHTML') # 使用 pandas 读取表格数据 df = pd.read_html(table_html, header=0, converters={'代码': str})[0] # 删除 "相关链接" 和 "加自选" 列 df = df.drop(columns=['相关链接', '加自选']) # 确保股票代码是6位数,填充前导0 df['代码'] = df['代码'].apply(lambda x: "'" + str(x).zfill(6)) # 添加数据到总的 DataFrame 中 all_data = pd.concat([all_data, df], ignore_index=True) # 点击 "下一页" 按钮 next_button = self.driver.find_element(By.XPATH, '//a[text()="下一页"]') next_button.click() # 等待页面加载 time.sleep(10) # 可以根据实际情况调整等待时间 except Exception as e: print("获取数据时出错:", e) break # 如果出错,停止爬取 print(all_data) # 保存数据到 CSV 文件 all_data.to_csv(self.output_file, index=False, encoding='utf-8-sig') print("数据保存成功。") except Exception as e: print("发生异常:", e) finally: self.close_driver() # 运行函数 scraper = StockScraper( url='http://quote.eastmoney.com/center/gridlist.html#sz_a_board', num_pages_to_scrape=5, output_file='stocks.csv' ) scraper.fetch_stock_data()
运行结果:
心得体会:学会了如何寻找数据接口,做的过程粗心漏了反编译
作业3
实验
要求:爬取中国大学 2021 主榜
(https://www.shanghairanking.cn/rankings/bcur/2021)所有院校信息,并存储在数据库中,同时将浏览器 F12 调试分析的过程录制 Gif 加入至博客中。
代码:
import time import pandas as pd from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.service import Service class UniversityRankingScraper: def __init__(self, url, output_file): self.url = url self.output_file = output_file self.data_list = [] def setup_driver(self): service = Service(executable_path="D:/Google/chrome/Application/chromedriver-win64/chromedriver.exe") return webdriver.Chrome(service=service) def scrape_data(self): try: driver = self.setup_driver() # 请求网页 driver.get(self.url) # 无限循环,直到“下一页”按钮不可点击时跳出循环 while True: # 获取所有的行元素 rows = driver.find_elements(By.XPATH,'//tbody/tr') # 循环每行,获取并存储数据 for row in rows: columns = row.find_elements(By.TAG_NAME,'td') ranking = int(columns[0].text) university = columns[1].text province = columns[2].text type_ = columns[3].text score = float(columns[4].text) self.data_list.append([ranking, university, province, type_, score]) # 尝试点击“下一页”按钮 try: next_page_button = driver.find_element(By.XPATH,'/html/body/div/div/div/div[2]/div/div[3]/div[2]/div[1]/div/ul/li[9]') if next_page_button.get_attribute('class') == 'ant-pagination-disabled ant-pagination-next': break next_page_button.click() time.sleep(10) # 等待页面加载 except Exception as e: print(e) break # 将数据存储到 DataFrame,然后写入 CSV 文件 df = pd.DataFrame(self.data_list, columns=['排名', '学校', '省市', '类型', '总分']) df.to_csv(self.output_file, index=False, encoding='utf-8-sig') print(f"数据已保存到 '{self.output_file}' 文件。") except Exception as e: print("发生异常:", e) finally: if driver: driver.quit() # 运行函数 scraper = UniversityRankingScraper( url='https://www.shanghairanking.cn/rankings/bcur/2021', output_file='universities.csv' ) scraper.scrape_data()
运行结果:
心得体会:与以前的主要差别在于如何实现翻页