Compare commits

..

2 Commits

Author SHA1 Message Date
root
7af1c6acbe refactor(crawler): use webdriver-manager and add debug logging 2025-12-04 11:35:17 +08:00
Ching L
6b5c4a3a1b refactor(crawler): replace requests with selenium for web scraping
- Replaced requests + BeautifulSoup with Selenium WebDriver
  - Added Chrome WebDriver with headless mode support
  - Updated HTML element extraction to use Selenium locators
  - Fixed logger path to use local directory for cross-platform compatibility
  - Added proper error handling for element extraction
  - Maintained compatibility with existing Redis and Mastodon functionality
2025-12-03 16:07:42 +08:00

View File

@ -2,21 +2,30 @@
import requests import requests
from bs4 import BeautifulSoup
import re import re
import redis import redis
import json import json
import feedparser
import cloudscraper
from loguru import logger from loguru import logger
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from webdriver_manager.chrome import ChromeDriverManager
import time
from mastodon import Mastodon from mastodon import Mastodon
# logging.basicConfig(filename='/root/develop/log/chh-craler.log', level=logging.INFO) # Configure logger - use local path for macOS
# logger = logging.getLogger('/root/develop/log/chh-craler.log') import os
logger.add('/home/ching/logs/chh-craler.log', level='INFO') log_dir = './logs'
scraper = cloudscraper.create_scraper() if not os.path.exists(log_dir):
os.makedirs(log_dir)
logger.add('./logs/chh-crawler.log', level='INFO')
# connect to redis with password # connect to redis with password
redis_db = redis.StrictRedis(host="localhost", redis_db = redis.StrictRedis(host="localhost",
@ -27,30 +36,6 @@ mastodon_client = Mastodon(
api_base_url = 'https://nofan.xyz/' api_base_url = 'https://nofan.xyz/'
) )
# 主分类和下级分类的映射关系
CATEGORY_MAPPING = {
'照片': ['人物肖像', '风景游记', '微距静物', '人文扫街', '动物植物', '其它作品'],
'电脑': ['配件开箱', '整机搭建', '升级改造', '桌面书房', 'QNAP专区'],
'掌设': ['智能手机', '智能穿戴', '笔电平板', '周边附件', 'ThinkPad专区'],
'摄影': ['微单卡片', '单反单电', '经典旁轴', '怀旧菲林', '影音摄像', '周边附件'],
'汽车': ['买菜车', '商务车', '性能车', '旅行车', 'SUV', 'MPV', '摩托轻骑', '改装配件', 'CHH Auto Club'],
'单车': ['山地车', '公路车', '折叠车', '休旅车'],
'模型': ['人偶手办', '比例成品', '拼装自组', 'RC遥控'],
'败家': ['收藏爱好', '品质生活', '数码前沿', '小资情调', '女王最大', '吃喝玩乐', '育儿分享'],
'时尚': ['鞋类', '服饰', '箱包'],
'腕表': ['机械表', '电子表'],
'视听': ['耳机耳放', '音箱功放', '解码转盘', '随身设备', '唱片录音'],
'美食': ['当地美食', '世界美食', '私房菜品', '美食器材']
}
def get_main_category(sub_category):
"""根据下级分类获取主分类"""
for main_category, sub_categories in CATEGORY_MAPPING.items():
if sub_category in sub_categories:
return main_category
# 如果不是下级分类或者是主分类本身,返回 None
return None
def save_to_redis(article): def save_to_redis(article):
key = 'chh-article:%s' % article['article_id'] key = 'chh-article:%s' % article['article_id']
@ -71,55 +56,160 @@ def url_shorten(url):
else: else:
return url return url
def setup_driver():
"""Configure and initialize Chrome WebDriver"""
chrome_options = Options()
chrome_options.add_argument('--headless') # Run in headless mode
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--disable-blink-features=AutomationControlled')
chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
chrome_options.add_experimental_option('useAutomationExtension', False)
# Set user agent
user_agent = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
chrome_options.add_argument(f'user-agent={user_agent}')
chrome_options.add_argument('--window-size=1920,1080')
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service, options=chrome_options)
return driver
def crawler(): def crawler():
rss_url = 'https://www.chiphell.com/portal.php?mod=rss' # Initialize Selenium WebDriver
feed = feedparser.parse(rss_url) home_url = 'https://www.chiphell.com/'
driver = setup_driver()
try:
logger.info(f"Starting to crawl {home_url}")
driver.get(home_url)
for entry in feed.entries: # Wait for page to load
# 标题 time.sleep(3)
title = entry.title
# 链接
url = entry.link
# 发布时间
date = entry.published
# 作者
author = entry.get('author', '未知')
# 分类(第一个 tag
category = entry.tags[0]['term'] if 'tags' in entry and entry.tags else '未分类'
# 获取主分类
main_category = get_main_category(category)
# 简介内容
content = entry.summary
content.removesuffix('...')
# 图片链接(从 links 中的 enclosure 找 image/jpeg # Debug: log page info
img_url = '' logger.info(f"Page title: {driver.title}")
for link in entry.get('links', []): logger.info(f"Current URL: {driver.current_url}")
if link.get('type', '') == 'image/jpeg' and link.get('rel') == 'enclosure': page_source = driver.page_source
img_url = link.get('href') logger.info(f"Page source length: {len(page_source)}")
img_url = img_url.replace('https://www.chiphell.com/', '') logger.info(f"Page source preview (first 2000 chars):\n{page_source[:2000]}")
break
# Find all chiphell_box elements and get the last one
boxes = driver.find_elements(By.CLASS_NAME, 'chiphell_box')
if not boxes:
logger.error("No chiphell_box elements found")
return
last_box = boxes[-1]
# Find the acon div within the last box
acon_div = last_box.find_element(By.CLASS_NAME, 'acon')
# Find the ul with id='threadulid'
ul = acon_div.find_element(By.ID, 'threadulid')
# Find all li elements
li_list = ul.find_elements(By.TAG_NAME, 'li')
# a list item is like:
# <li>
# <a href="article-30010-1.html" target="_blank" class="tm01 cl">
# <img src="https://static.chiphell.com/portal/202307/16/080741btbodlblx8nwzn74.jpg">
# </a>
# <div class="tmpad cl">
# <a href="article-30010-1.html" target="_blank" class="tm03 cl">一点小收藏—AP皇家橡树</a>
# <div class="avart">
# <a href="space-username-幼月.html" target="_blank" class="tmava"><img src="./data/avatar/000/27/86/96_avatar_small.jpg"></a>
# <div class="avimain cl">
# <a href="space-username-幼月.html" target="_blank" class="">幼月</a>
# </div>
# <div class="avimain2 cl">
# <span style="padding-left: 0px;">2023/07/16</span>
# <span class="avie">3231</span>
# <span class="arep">48</span>
# <a href="portal.php?mod=list&amp;catid=128" target="_blank" class="asort cl">腕表</a>
# </div>
# </div>
# <div class="tm04 cl">
# 又是我胡汉三,最近不知道怎么捅了腕表品牌的窝了。。三天两头给我塞东西。。所以作业按照混乱发!
# 没有文笔,只有碎碎念。
# ROYAL OAK 15551OR
# 女王的第一块AP最早许愿是蓝盘但是到的是白盘不过白盘也非常美 ...</div>
# </div>
# </li>
# get the article img, title, author, date, category, content and url
for li in li_list:
try:
# get the article img
img = li.find_element(By.TAG_NAME, 'img')
img_url = img.get_attribute('src')
# get the article title and URL
title_element = li.find_element(By.CLASS_NAME, 'tm03')
title = title_element.text
url = title_element.get_attribute('href')
if not url.startswith('http'):
url = home_url + url
# get the article id
article_id_match = re.search(r'article-(\d+)-1\.html', url)
article_id = article_id_match.group(1) if article_id_match else None
if not article_id:
continue
# get the article author
try:
avimain_div = li.find_element(By.CLASS_NAME, 'avimain')
author = avimain_div.find_element(By.TAG_NAME, 'a').text
except:
author = 'Unknown'
# get the article date
try:
avimain2_div = li.find_element(By.CLASS_NAME, 'avimain2')
date_span = avimain2_div.find_element(By.CSS_SELECTOR, 'span[style*="padding-left"]')
date = date_span.text
except:
date = 'Unknown'
# get the article category
try:
category_element = avimain2_div.find_element(By.CLASS_NAME, 'asort')
category = category_element.text
except:
category = 'Unknown'
# get the article content
try:
content_element = li.find_element(By.CLASS_NAME, 'tm04')
content = content_element.text
except:
content = 'No preview available'
# 提取 article_id从链接中用正则 # make the article info a dict
article_id_match = re.search(r'article-(\d+)-1\.html', url) article = {
article_id = article_id_match.group(1) if article_id_match else 'unknown' 'img_url': img_url,
'title': title,
'author': author,
'date': date,
'category': category,
'content': content,
'url': url,
'article_id': article_id
}
# 封装成字典 # save the article info to redis
article = { if save_to_redis(article):
'img_url': img_url, print(article)
'title': title,
'author': author, except Exception as e:
'date': date, logger.error(f"Error processing article: {e}")
'category': category, continue
'main_category': main_category,
'content': content, finally:
'url': url, # Close the WebDriver
'article_id': article_id driver.quit()
} logger.info("WebDriver closed")
# save the article info to redis
if save_to_redis(article):
logger.info(article)
def toot(): def toot():
# get all the keys in redis # get all the keys in redis
@ -133,26 +223,18 @@ def toot():
continue continue
# download article image to a temp file # download article image to a temp file
#img = requests.get(article['img_url']) img = requests.get(article['img_url'])
img = scraper.get(article['img_url'], timeout=10)
# upload article image to mastodon # upload article image to mastodon
media = mastodon_client.media_post(img.content, 'image/jpeg') media = mastodon_client.media_post(img.content, 'image/jpeg')
# 构建分类标签
if article.get('main_category'):
category_tags = f"#{article['main_category']} #{article['category']}"
else:
category_tags = f"#{article['category']}"
# toot the article info # toot the article info
toot_content = """{title} - {category_tags} by {author} \n {content} \n\n {url} \n\n #chh #chiphell \n """.format( toot_content = """{title} - #{category} by {author} \n {content} \n\n {url} \n\n #chh #chiphell \n """.format(
title=article['title'], title=article['title'],
category_tags=category_tags, category=article['category'],
author=article['author'], author=article['author'],
content=article['content'], content=article['content'],
url=article['url']) url=article['url'])
mastodon_client.status_post(toot_content, media_ids=[media['id']]) mastodon_client.status_post(toot_content, media_ids=[media['id']])
logger.info('Toot %s' % article['title'])
# add the article['id'] to the set # add the article['id'] to the set
redis_db.sadd('send-chh-article-id', article['article_id']) redis_db.sadd('send-chh-article-id', article['article_id'])