提取申请人和产品名称

发布时间 2024-01-11 15:35:26作者: 明媚的夏午

 

import random
import re
import os
import time
import pandas as pd
from pyvirtualdisplay import Display
import undetected_chromedriver as uc
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.by import By
from bs4 import BeautifulSoup


class INTERFACING():

    def __init__(self):
        self.driver_initialized = False
        self.driver = ''
        self.MAX_TRIALS = 2
        self.chrome_version =get_google_chrome_version()

    def make_soup(self):
        return BeautifulSoup(self.driver.page_source, 'lxml')  # etree.HTML()

    def current_url(self):
        return self.driver.current_url

    def get_driver(self):

        uc.TARGET_VERSION = get_google_chrome_version()
        chrome_options = uc.ChromeOptions()
        # chrome_options.add_argument("--headless")
        chrome_options.add_argument("--window-size=1920.,1080")
        chrome_options.add_argument("--disable-extensions")
        chrome_options.add_argument("--disable-dev-shm-usage")
        chrome_options.add_argument("--disable-popup-blocking")
        chrome_options.add_argument("--profile-directory=Default")
        chrome_options.add_argument("--ignore-certificate-errors")
        chrome_options.add_argument("--disable-plugins-discovery")
        chrome_options.add_argument("--incognito")
        chrome_options.add_argument("--no-first-run")
        chrome_options.add_argument("--no-service-autorun")
        chrome_options.add_argument("--no-default-browser-check")
        chrome_options.add_argument("--password-store=basic")
        chrome_options.add_argument("--no-sandbox")
        chrome_options.add_argument('--disable-application-cache')
        chrome_options.add_argument('--disable-gpu')
        chrome_options.add_argument("--disable-setuid-sandbox")
        chrome_options.add_argument(
            "user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.82 Safari/537.36"
        )
        self.driver = uc.Chrome(options=chrome_options, version_main=get_google_chrome_version())
        time.sleep(10)
        self.driver_initialized = True

    def close_driver(self):
        self.driver.quit()

    def get_selenium_response(self, url):

        # try:
        if not self.driver_initialized:
            self.get_driver()
        else:
            pass
        self.driver.get(url)
        time.sleep(3)
        soup = self.make_soup()
        return soup

    def get_page_source(self):
        return self.driver.page_source

    def clicking(self, xpath):
        elem = self.driver.find_element(By.XPATH, xpath)
        elem.click()
        time.sleep(random.randint(2, 3))

    def entering_values(self, xpath, value):
        elem = self.driver.find_element(By.XPATH, xpath)
        elem.clear()
        elem.send_keys(value)
        time.sleep(random.randint(2, 4))

    def send_keys(self, xpath):
        elem = self.driver.find_element(By.XPATH, xpath).send_keys(Keys.RETURN)

    def going_back(self):
        self.driver.execute_script("window.history.go(-1)")
        time.sleep(1)

    def refresh_page(self):
        self.driver.refresh()

    def close_handle(self):
        self.driver.close()

    def get_current_handle(self):
        return self.driver.current_window_handle

    def get_all_handles(self):
        return self.driver.window_handles

    def swtich_to_window(self, handle):
        self.driver.switch_to.window(handle)


def get_google_chrome_version():
    try:
        search_pattern = "(\d+?)\."
        chrome_version = os.popen("google-chrome -version").read()
        chrome_version = re.search(search_pattern, chrome_version).group(1)
        return chrome_version
    except Exception as e:
        raise Exception(f"获取google chrome的版本错误,详情:{e}")


def extract_product_applicant(soup):
    product_pattern = re.compile(r'产品名称:\s*([^<>]+)')
    applicant_pattern = re.compile(r'申\s*请\s*人:\s*([^<>]+)')

    product_matches = soup(text=product_pattern)
    applicant_matches = soup(text=applicant_pattern)

    products = [product_pattern.findall(match)[0] for match in product_matches]
    applicants = [applicant_pattern.findall(match)[0] for match in applicant_matches]
    if not products:
        product_pattern = re.compile("产品名称<\/strong>:\s*([^<>]+)")
        applicant_pattern = re.compile("申\s*请\s*人<\/strong>:\s*([^<>]+)")

        product_matches = soup(text=product_pattern)
        applicant_matches = soup(text=applicant_pattern)

        products = [product_pattern.findall(match)[0] for match in product_matches]
        applicants = [applicant_pattern.findall(match)[0] for match in applicant_matches]

    return list(zip(products, applicants))


base_url = "https://www.cmde.org.cn"
result_df = pd.DataFrame(columns=["index", "time", "product_name", "product_company", "ID", "reason"])
num = -1
with Display(visible=False, size=(1920, 1080)) as display:
    for page in range(1, 10):
        if page == 1:
            soup = INTERFACING().get_selenium_response("https://www.cmde.org.cn/xwdt/shpgzgg/cxyxgsh/index.html")
        else:
            soup = INTERFACING().get_selenium_response(
                f"https://www.cmde.org.cn/xwdt/shpgzgg/cxyxgsh/index_{page - 1}.html")

        groups_selector = soup.select('.list > ul >li')
        for index, group in enumerate(groups_selector[17:]):
            group_href = group.find('a')['href']
            update_time = group.find('span').text[1:-1]
            group_href = group_href if "http" in group_href else base_url + "/" + group_href.replace("../", "")
            detail_soup = INTERFACING().get_selenium_response(group_href)
            if not detail_soup:
                print(f"第{page}页,第{index}条没有抓取到")
                continue

            data = extract_product_applicant(detail_soup)
            if not data:
                # 找到表格
                table = detail_soup.find('table')
                if not table:
                    print(f"!!!!!!!!!!!!!!!!!!!!!!第{page}页,第{index}条没有抓取到")
                    continue
                # 提取表格数据
                rows = table.find_all('tr')
                data = []
                for row in rows[1:]:
                    cells = row.find_all('td')
                    values = [cell.text.strip() for cell in cells]
                    data.append(values)

                # 提取表头
                header_cells = rows[0].find_all('td')
                headers = [cell.text.strip() for cell in header_cells]

                # 将数据转换为DataFrame
                df = pd.DataFrame(data, columns=headers)

                # 输出结果
                print(df)
                for i in range(len(df)):
                    num += 1
                    result_df.loc[num, "index"] = num
                    result_df.loc[num, "time"] = update_time
                    result_df.loc[num, "product_name"] = df.loc[i, '产品名称']
                    result_df.loc[num, "product_company"] = df.loc[i, '申请人']
                    result_df.loc[num, "ID"] = df.loc[i, '受理号']
                    result_df.loc[num, "reason"] = df.loc[i, '同意理由']
                result_df.to_excel("result.xlsx")
                print(f"第{page}页,第{index}条抓取完成")
                continue

            for i, (product_name, applicant) in enumerate(data):
                num += 1
                result_df.loc[num, "index"] = num
                result_df.loc[num, "time"] = update_time
                result_df.loc[num, "product_name"] = product_name
                result_df.loc[num, "product_company"] = applicant
            result_df.to_excel(f"result_1_4.xlsx")

            print(f"第{page}页,第{index}条抓取完成")