2023数据采集与融合技术实践作业三

发布时间 2023-10-24 00:08:51作者: lu_lu_l

作业①:

  • 要求:指定一个网站,爬取这个网站中的所有的所有图片,例如:中国气象网(http://www.weather.com.cn)。使用scrapy框架分别实现单线程和多线程的方式爬取。务必控制总页数(学号尾数2位)、总下载的图片数量(尾数后3位)等限制爬取的措施。

  • 输出信息: 将下载的Url信息在控制台输出,并将下载的图片存储在images子文件中,并给出截图。

  • Gitee文件夹链接:Gitee作业链接

(1)代码和图片

  • 单线程
    MySpider.py
import scrapy
from scrapy.selector import Selector
from ..items import Weather1Item

class MySpider(scrapy.Spider):
    name = "mySpider"
    allowed_domains = ["www.weather.com.cn"]
    start_urls = ["http://www.weather.com.cn/"]
    max_pages = 59 # 最大下载页数

    def __init__(self):
        super(MySpider, self).__init__()
        self.page_count = 0
        self.img_count = 0

    def parse(self, response):
        selector = Selector(response)

        # 解析当前页面中的所有图片链接
        srcs = selector.xpath('//img/@src').extract()
        for src in srcs:
            item = Weather1Item()
            item['src'] = src
            yield item # 将item发送到pipeline中进行处理

            self.img_count += 1
            if self.img_count >= self.max_pages:
                break
        # 解析当前页面中的所有链接,递归地访问每个链接并进行解析
        pages = selector.xpath('//a/@href').extract()
        for page in pages:
            url = response.urljoin(page) # 将相对链接转化为绝对链接
            yield scrapy.Request(url=url, callback=self.parse)

            self.page_count += 1
            if self.page_count >= self.max_pages:
                break

pipelines.py

import os
import urllib

class Weather1Pipeline:
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.121 Safari/537.36"
    }

    max_pages = 59  # 最大下载页数

    def __init__(self):
        self.page_count = 1

    def process_item(self, item, spider):
        try:
            src = item['src'] # 获取图片链接
            if src[-4:] == ".jpg":
                ext = ".jpg" # 如果链接以.jpg结尾,则设置文件扩展名为.jpg
            elif src[-4:] == ".png":
                ext = ".png"  # 如果链接以.png结尾,则设置文件扩展名为.png
            else:
                ext = "" # 否则,不设置文件扩展名

            # 如果images目录不存在,则创建它
            # (由于码云一次最多上传10个文件,所以把images删了,不过运行代码之后会自动创建images子文件夹)
            if not os.path.exists("images"):
                os.makedirs("images")

            req = urllib.request.Request(src, headers=self.headers)
            data = urllib.request.urlopen(req, timeout=100).read()

            # 将下载的图片保存在images目录下
            with open(f"images/{self.page_count}{ext}", "wb") as fobj:
                fobj.write(data)

            print(f"Downloaded images/{self.page_count}{ext}")
            self.page_count += 1

            # 达到指定的最大页数,则停止爬取,关闭爬虫
            if self.page_count >= self.max_pages:
                spider.crawler.engine.close_spider(spider, "Reached the maximum number of pages")
        except Exception as err:
            print(err)

        return item

items.py

import scrapy

class Weather1Item(scrapy.Item):
    # define the fields for your item here like:
    # name = scrapy.Field()
    src = scrapy.Field()

settings.py

ITEM_PIPELINES = {
    'Weather_1.pipelines.Weather1Pipeline': 300,
}

run.py

from scrapy import cmdline
cmdline.execute("scrapy crawl mySpider -s LOG_ENABLED=False".split())
  • 运行结果图片
图片名称
  • 多线程
    MySpider.py
import scrapy
from scrapy.selector import Selector
from ..items import Weather2Item
from concurrent.futures import ThreadPoolExecutor

class MySpider(scrapy.Spider):
    name = "mySpider"
    allowed_domains = ["www.weather.com.cn"]
    start_urls = ["http://www.weather.com.cn/"]
    max_pages = 59  # 最大下载页数

    def __init__(self):
        super(MySpider, self).__init__()
        self.page_count = 0  # 计数器,跟踪已处理的页面数量
        self.img_count = 0  # 计数器,跟踪已下载的图片数量
        self.executor = ThreadPoolExecutor(max_workers=5)  # 创建一个包含5个工作线程的线程池

    def parse(self, response):
        selector = Selector(response)

        # 提取页面中的图片链接并创建对应的Item对象
        srcs = selector.xpath('//img/@src').extract()
        for src in srcs:
            item = Weather2Item()
            item['src'] = src
            yield item

            self.img_count += 1
            if self.img_count >= self.max_pages:
                break

        # 提取页面中的链接,并发送请求继续解析下一个页面
        pages = selector.xpath('//a/@href').extract()
        for page in pages:
            url = response.urljoin(page)
            yield scrapy.Request(url=url, callback=self.parse)

            self.page_count += 1
            if self.page_count >= self.max_pages:
                break

    # 修改爬虫关闭方法,确保在所有线程完成后再关闭爬虫
    def closed(self, reason):
        self.executor.shutdown()
        print("爬取完成")

pipelines.py

import os
import urllib
from concurrent.futures import ThreadPoolExecutor

class Weather2Pipeline:
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.121 Safari/537.36"
    }

    max_pages = 59  # 最大下载页数

    def __init__(self):
        self.page_count = 1
        self.executor = ThreadPoolExecutor(max_workers=5)  # 创建一个包含5个工作线程的线程池

    def process_item(self, item, spider):
        try:
            src = item['src']
            if src[-4:] == ".jpg":
                ext = ".jpg" # 如果链接以.jpg结尾,则设置文件扩展名为.jpg
            elif src[-4:] == ".png":
                ext = ".png"  # 如果链接以.png结尾,则设置文件扩展名为.png
            else:
                ext = "" # 否则,不设置文件扩展名

            # 如果images目录不存在,就创建它
            # (由于码云一次最多上传10个文件,所以把images删了,不过运行代码之后会自动创建images子文件夹)
            if not os.path.exists("images"):
                os.makedirs("images")

            req = urllib.request.Request(src, headers=self.headers)

            # 使用线程池执行图片下载任务
            future = self.executor.submit(urllib.request.urlopen, req, timeout=100)
            data = future.result().read()

            # 将下载的图片保存在images目录下
            with open(f"images/{self.page_count}{ext}", "wb") as fobj:
                fobj.write(data)

            print(f"Downloaded images/{self.page_count}{ext}")
            self.page_count += 1

            # 达到指定的最大页数,则停止爬取
            if self.page_count >= self.max_pages:
                spider.crawler.engine.close_spider(spider, "Reached the maximum number of pages")
        except Exception as err:
            print(err)

        return item

    # 修改关闭方法,确保在所有线程完成后再关闭爬虫
    def close_spider(self, spider):
        self.executor.shutdown()

items.py

import scrapy

class Weather2Item(scrapy.Item):
    # define the fields for your item here like:
    # name = scrapy.Field()
    src = scrapy.Field()

settings.py

ITEM_PIPELINES = {
    'Weather_2.pipelines.Weather2Pipeline': 300,
}

run.py

from scrapy import cmdline
cmdline.execute("scrapy crawl mySpider -s LOG_ENABLED=False".split())
  • 运行结果图片
图片名称

(2)心得体会

上网查阅知道,在scrapy中设置多线程不需要通过thread,只需要修改settings.py中的CONCURRENT_REQUESTS的值即可,CONCURRENT_REQUESTS 是 Scrapy 的设置之一,它控制着同时发送给目标网站的请求数量。增加 CONCURRENT_REQUESTS 的值可以在单线程的情况下模拟多线程的效果,但这实际上只是增加了同时处理请求的能力,并不能真正实现并行处理。所以还是通过concurrent.futures模块中的ThreadPoolExecutor类实现的,创建了一个包含5个工作线程的线程池,减少了启动和销毁线程的开销,还了解了异步编程和多线程/多进程编程。

作业②

  • 要求:熟练掌握 scrapy 中 Item、Pipeline 数据的序列化输出方法;使用scrapy框架+Xpath+MySQL数据库存储技术路线爬取股票相关信息。

  • 候选网站:东方财富网:https://www.eastmoney.com/

  • 输出信息:MySQL数据库存储和输出格式如下:
    表头英文命名例如:序号id,股票代码:bStockNo……,由同学们自行定义设计

序号 股票代码 股票名称 最新报价 涨跌幅 涨跌额 成交量 振幅 最高 最低 今开 昨收
1 688093 N世华 28.47 10.92 26.13万 7.6亿 22.34 32.0 28.08 30.20 17.55
2......

(1)代码和图片

  • 代码
    MySpider.py
import scrapy
import re
from ..items import ShareItem

class MySpider(scrapy.Spider):
    name = 'mySpider'
    # start_urls 存放要爬取网页的 URL,这里只有一个 URL,即股票实时数据网址
    start_urls =['https://83.push2.eastmoney.com/api/qt/clist/get?cb=jQuery1124025224620015255605_16966941128'
                 '61&pn=1&pz=20&po=1&np=1&ut=bd1d9ddb04089700cf9c27f6f7426281&fltt=2&invt=2&wbp2u=|0|0|0|web&'
                 'fid=f3&fs=m:0+t:6,m:0+t:80,m:1+t:2,m:1+t:23,m:0+t:81+s:2048&fields=f1,f2,f3,f4,f5,f6,f7,f8,'
                 'f9,f10,f12,f13,f14,f15,f16,f17,f18,f20,f21,f23,f24,f25,f22,f11,f62,f128,f136,f115,f152&_='
                 '1696694112862']
    page=1 # 记录爬取的页面数

    def parse(self, response):
        try:
            # 获取响应的字符串
            data=response.body.decode()
            # 正则表达式匹配数据,获取完整的股票信息数据
            pat = "\"data\":.*\]"
            data = re.compile(pat, re.S).findall(data)
            # 去除前面的total和diff数据
            data[0] = data[0][29:]
            # 将股票信息分割开
            datas = data[0].split('},{')
            stocks = []
            for i in range(len(datas)):
                stock = datas[i].replace('"', "").split(',')  # 不同信息分割,并且去掉引号
                for j in range(len(stock)):
                    # 冒号分割,将每个字段的值提取出来
                    t = stock[j].split(':')
                    stock[j] = t[1]
                # 创建 ShareItem 实例,存储股票信息
                item=ShareItem()
                item['code']=stock[11]
                item['name'] = stock[13]
                item["latest_price"] = stock[1]
                item["price_limit"] = stock[2]
                item["price_range"] = stock[3]
                item["turnover"] = stock[4]
                item["volume_transaction"] = stock[5]
                item["amplitude"] = stock[6]
                item["highest"] = stock[14]
                item["lowest"] = stock[15]
                item["today_open"] = stock[16]
                item["yesterday_close"] = stock[17]
                # print(item['code'])
                # 将当前股票数据传递给 scrapy 引擎
                yield item
            self.page+=1
            #翻页处理,爬取前9页数据
            if self.page < 10:
                # 修改 URL 中的 pn 参数实现翻页
                next_url = 'https://83.push2.eastmoney.com/api/qt/clist/get?cb=jQuery1124025224620015255605_1' \
                           '696694112861&pn='+str(self.page)+'&pz=20&po=1&np=1&ut=bd1d9ddb04089700cf9c27f6f742' \
                           '6281&fltt=2&invt=2&wbp2u=|0|0|0|web&fid=f3&fs=m:0+t:6,m:0+t:80,m:1+t:2,m:1+t:23,' \
                           'm:0+t:81+s:2048&fields=f1,f2,f3,f4,f5,f6,f7,f8,f9,f10,f12,f13,f14,f15,f16,f17,f18,' \
                          'f20,f21,f23,f24,f25,f22,f11,f62,f128,f136,f115,f152&_=1696694112862'
                # 再次发起 scrapy.Request 请求,继续爬取下一页数据
                yield scrapy.Request(next_url,callback = self.parse)
        except Exception as err:
            print(err)

pipelines.py

import mysql.connector

class SharePipeline(object):

    def open_spider(self, spider):
        print("opened")
        try:
            # 建立与MySQL数据库的连接
            self.con = mysql.connector.connect(
                host='127.0.0.1',
                user='root',
                password='123456',
                database='shares'
            )
            print("opened1")
            self.cursor = self.con.cursor()
            # 删除已存在的stock表,并创建新的stock表用于存储股票数据
            self.cursor.execute('DROP TABLE IF EXISTS stock')
            self.cursor.execute('''
                CREATE TABLE IF NOT EXISTS stock (  
                    id INT(11) NOT NULL AUTO_INCREMENT,
                    code VARCHAR(20),   
                    name VARCHAR(100),  
                    latest_price DECIMAL(10, 2),   
                    price_limit DECIMAL(10, 2),     
                    price_range DECIMAL(10, 2),     
                    turnover DECIMAL(15, 5),   
                    volume_transaction DECIMAL(15, 5), 
                    amplitude DECIMAL(15, 5),  
                    highest DECIMAL(10, 2), 
                    lowest DECIMAL(10, 2),  
                    today_open DECIMAL(10, 2), 
                    yesterday_close DECIMAL(10, 2), 
                    PRIMARY KEY (id)
                )
            ''')
            self.opened = True
        except Exception as err:
            print(err)
            self.opened = False

    def process_item(self, item, spider):
        try:
            if self.opened:
                # 将爬取到的股票数据插入到stock表中
                self.cursor.execute('''
                INSERT INTO stock (code, name, latest_price, price_limit, price_range, turnover, volume_transaction,
amplitude, highest, lowest, today_open, yesterday_close) 
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                ''', (item['code'],  #代码
                      item['name'],  #名称
                      item['latest_price'],  #最新价
                      item['price_limit'],  #涨跌幅
                      item['price_range'],  #涨跌额
                      item['turnover'],  #成交量
                      item['volume_transaction'],  #成交额
                      item['amplitude'],  #振幅
                      item['highest'],  #最高
                      item['lowest'],   #最低
                      item['today_open'],   #今开
                      item['yesterday_close']   #昨收
                ))
                self.con.commit()
        except Exception as err:
            print(err)
        return item

    def close_spider(self, spider):
        if self.opened:
            # 关闭与MySQL数据库的连接
            self.con.close()
            self.opened = False
        print("closed")


items.py

import scrapy

class ShareItem(scrapy.Item):
    # define the fields for your item here like:
    # name = scrapy.Field()
    code = scrapy.Field()  #代码
    name= scrapy.Field()   #名称
    latest_price = scrapy.Field()  #最新价
    price_limit = scrapy.Field()  #涨跌幅
    price_range = scrapy.Field()  #涨跌额
    turnover = scrapy.Field()  #成交量
    volume_transaction = scrapy.Field()  #成交额
    amplitude = scrapy.Field()  #振幅
    highest= scrapy.Field()  #最高
    lowest = scrapy.Field()  #最低
    today_open = scrapy.Field()  #今开
    yesterday_close = scrapy.Field()  #昨收

settings.py

ROBOTSTXT_OBEY = False
ITEM_PIPELINES = {
    'share.pipelines.SharePipeline': 300,
}
DEFAULT_REQUEST_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3141.8 Safari/537.36}"
    }

run.py

from scrapy import cmdline
cmdline.execute("scrapy crawl mySpider -s LOG_ENABLED=False".split())
  • 运行结果图片
    打开Navicat for MySQL查看数据库里的表格
图片名称

(2)心得体会

前面已经爬过这个网站是使用抓包的方式,已经知道如何获取那些需要的数据,现在用scrapy框架只是换个方式,在MySpider写爬虫程序,通过观察url可以看到只有“pn”的变化,所以通过url参数构造下一页的url实现翻页爬取功能,这样就可以限制爬取页数,对网页链接实现翻页有了更深的认识,在Item、Pipeline 编写数据的序列化输出方法,因为数据要保存到MySQL中,所以通过使用mysql-connector连接到MySQL数据库并执行SQL语句实现创建表、插入数据等操作,学习到了如何在Python中与数据库进行交互。在这里遇见问题,代码真的很神奇,明明没有问题,运行之后一直不出现结果,在MySQL中的shares数据库就是不出现表格stock,玄学的事情,刷新也没有用。后来一直以为是代码问题,但是在爬虫程序加调试信息没问题,加打印信息,数据也都有爬出来,就是不到数据库中,后来经过一位朋友的帮忙,没有改动任何代码,但是神奇的是刷新之后出现了表格。还有不要忘记修改settings.py,把ROBOTSTXT_OBEY = True改为False。

作业③:

  • 要求:熟练掌握 scrapy 中 Item、Pipeline 数据的序列化输出方法;使用scrapy框架+Xpath+MySQL数据库存储技术路线爬取外汇网站数据。
  • 候选网站:中国银行网:https://www.boc.cn/sourcedb/whpj/
  • 输出信息:
Currency TBP CBP TSP CSP Time
阿联酋迪拉姆 198.58 192.31 199.98 206.59 11:27:14

(1)代码和图片

  • 代码
    MySpider.py
import scrapy
from scrapy import Selector
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from ..items import BankItem

class MySpider(scrapy.Spider):
    name = "mySpider"
    start_urls = ["https://www.boc.cn/sourcedb/whpj/",] # 起始URL

    def __init__(self):
        chrome_options = Options()
        chrome_options.add_argument('--headless')
        self.driver = webdriver.Chrome(options=chrome_options)# 初始化ChromeDriver
        self.page = 1 # 用于记录当前爬取的页数

    def start_requests(self):
        while self.page < 10:
            # 指定要爬取的页数,爬取9页
            url = f'https://www.boc.cn/sourcedb/whpj/index_{self.page}.html' # 构造待爬取的URL
            yield scrapy.Request(url, callback=self.parse) # 返回构造好的Request对象,并指定回调函数为parse
            self.page += 1

    def parse(self, response):
        self.driver.get(response.url) # 使用Selenium打开目标页面
        sel = Selector(text=self.driver.page_source) # 使用Selector解析HTML页面源码
        rows = sel.xpath('//table//tr[position()>1]')
        for row in rows:
            # 实例化一个BankItem用于保存爬取到的结果
            item = BankItem()
            item['currency'] = row.xpath('./td[1]/text()').extract_first()
            item['tbp'] = row.xpath('./td[2]/text()').extract_first()
            item['cbp'] = row.xpath('./td[3]/text()').extract_first()
            item['tsp'] = row.xpath('./td[4]/text()').extract_first()
            item['csp'] = row.xpath('./td[5]/text()').extract_first()
            item['time'] = row.xpath('./td[8]/text()').extract_first()
            # print(item['currency'])
            yield item

    def closed(self, spider):
        # 在关闭爬虫时,关闭ChromeDriver
        self.driver.quit()

pipelines.py


import mysql.connector

class BankPipeline(object):

    def open_spider(self, spider):
        print("opened")
        try:
            # 连接MySQL数据库
            self.con = mysql.connector.connect(
                host='127.0.0.1',
                user='root',
                password='123456',
                database='bank'
            )
            print("opened1")  # 打印提示信息
            self.cursor = self.con.cursor()
            # 删除已存在的表格(如果存在)
            self.cursor.execute("DROP TABLE IF EXISTS currency")
            # 创建新的表格currency
            self.cursor.execute(
                """
                CREATE TABLE currency (
                ID INT AUTO_INCREMENT PRIMARY KEY, 
                Currency VARCHAR(255), 
                TBP DOUBLE, 
                CBP DOUBLE,
                TSP DOUBLE, 
                CSP DOUBLE, 
                Time VARCHAR(255))
                """)
            self.opened = True
        except Exception as err:
            print(err)
            self.opened = False

    def process_item(self, item, spider):
        try:
            if self.opened:
                # 向表格中插入数据
                self.cursor.execute(
                    "INSERT INTO currency (Currency, TBP, CBP, TSP, CSP, Time) VALUES (%s, %s, %s, %s, %s, %s)",
                    (item["currency"],
                     item["tbp"],
                     item["cbp"],
                     item["tsp"],
                     item["csp"],
                     item["time"]))
                self.con.commit()
        except Exception as err:
            print(err)
        return item

    def close_spider(self, spider):
        if self.opened:
            # 关闭数据库连接
            self.con.close()
            self.opened = False
        print("closed")

items.py

import scrapy

class BankItem(scrapy.Item):
    # define the fields for your item here like:
    # name = scrapy.Field()
        currency = scrapy.Field()
        tbp = scrapy.Field()
        cbp = scrapy.Field()
        tsp = scrapy.Field()
        csp = scrapy.Field()
        time = scrapy.Field()

setting.py

ROBOTSTXT_OBEY = False
ITEM_PIPELINES = {
    'bank.pipelines.BankPipeline': 300,
}
USER_AGENT = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36'
LOG_LEVEL = 'ERROR'

run.py

from scrapy import cmdline
cmdline.execute("scrapy crawl mySpider -s LOG_ENABLED=False".split())
  • 运行结果图片
    打开Navicat for MySQL软件查看数据库里的表格,有的值是null,是因为爬取到的时候本来就没有值
图片名称

(2)心得体会

使用scrapy框架+Xpath+MySQL数据库存储和selenium技术路线爬取外汇网站数据,本来是没有打算使用selenium的,但不知道为什么就是爬取不到数据,所以就去尝试加selenium,有一次就不小心出来结果,但是不知道是没加selenium还是加了selenium爬取出来的,后来尝试不加但是没有爬取出来,后来又改为加selenium但修改了Xpath,可能还是因为xpath没有爬取正确的,一定要看好页面结构,正确使用xpath,其次以后就知道一定要加打印信息去测试,还有日志。这里的数据也是用MySQL数据库存储,所以对在Python中与数据库进行交互有更熟练的操作。
总之这个代码学习了将scrapy和selenium结合起来,selenium用来打开那些页面,而scrapy则负责抓取数据并处理抓取过程。