from mastodon import Mastodon import argparse import time import requests import re from datetime import datetime from loguru import logger import os # cookie = "u=%7EBVoC1S9tgw4; uuid=24edfe42401cbb03cf1f.1676446846.20; PHPSESSID=4ev7ef0f9cim18lrest8l52kn6; m=looo.ching%40gmail.com; tgw_l7_route=f174d6f255742a6ee2e9a07cf9cca0fa" # token = "9927e23b" logger.add('/root/develop/log/mas2ff-hegui.log', level='INFO') def post_fanfou(cookie, token, fanfou_status, images=None): if not images: images = [] # post status if not images: try: response = requests.post( url="https://m.fanfou.com/home", params={ "v": str(int(datetime.timestamp(datetime.now()))), }, headers={ "Authority": "m.fanfou.com", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", "Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7,zh-TW;q=0.6", "Cache-Control": "max-age=0", "Content-Type": "application/x-www-form-urlencoded", "Cookie": cookie, "Origin": "https://m.fanfou.com", "Referer": "https://m.fanfou.com/home", "Sec-Ch-Ua": "\"Chromium\";v=\"118\", \"Google Chrome\";v=\"118\", \"Not=A?Brand\";v=\"99\"", "Sec-Ch-Ua-Mobile": "?0", "Sec-Ch-Ua-Platform": "\"macOS\"", "Sec-Fetch-Dest": "document", "Sec-Fetch-Mode": "navigate", "Sec-Fetch-Site": "same-origin", "Sec-Fetch-User": "?1", "Upgrade-Insecure-Requests": "1", "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36", "Accept-Encoding": "gzip", }, data={ "content": fanfou_status, "token": token, "action": "msg.post", }, ) logger.info('%s, Response status code: %s' % (fanfou_status, response.status_code)) if not '你在做什么?' in response.text: logger.error('Post %s maybe failed', fanfou_status) except requests.exceptions.RequestException: logger.error('Post %s failed', fanfou_status) else: # post with images image_urls = [media['url'] for media in images if media['type'] == 'image'] image_count = '' if len(image_urls) > 1: image_count = '(%%s/%s)' % len(image_urls) headers = { "Authority": "m.fanfou.com", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", "Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7,zh-TW;q=0.6", "Cache-Control": "max-age=0", "Cookie": cookie, "Origin": "https://m.fanfou.com", "Referer": "https://m.fanfou.com/photo.upload", "Sec-Ch-Ua": "\"Chromium\";v=\"118\", \"Google Chrome\";v=\"118\", \"Not=A?Brand\";v=\"99\"", "Sec-Ch-Ua-Mobile": "?0", "Sec-Ch-Ua-Platform": "\"macOS\"", "Sec-Fetch-Dest": "document", "Sec-Fetch-Mode": "navigate", "Sec-Fetch-Site": "same-origin", "Sec-Fetch-User": "?1", "Upgrade-Insecure-Requests": "1", "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36", "Accept-Encoding": "gzip", } for x in range(len(image_urls)): try: image_resp = requests.get(image_urls[x]) status = fanfou_status if image_count: status = fanfou_status + ' ' + image_count % (x+1) if image_resp.status_code == 200: # 创建一个临时文件 with open('temp.jpg', 'wb') as file: file.write(response.content) # 确保文件存在 if os.path.exists('temp.jpg'): with open('temp.jpg', 'rb') as file: # 以下是您的multipart/form-data请求部分 files = { 'picture': ('temp.jpg', file, 'image/jpeg') # 注意这里仍然需要指定MIME类型 } data = { 'desc': status, 'action': 'photo.upload', 'photo': '', 'token': token, } # 发送请求 try: response = requests.post("https://m.fanfou.com/home/upload", headers=headers, files=files, data=data) logger.info('%s, Response status code: %s' % (status, response.status_code)) except requests.exceptions.RequestException: logger.error('Post %s failed', status) except: continue def run(instance, access_token, fanfou_cookie, fanfou_token): mastodon_cli = Mastodon( access_token=access_token, api_base_url=instance) me_info = mastodon_cli.me() me_id = me_info['id'] me_timeline = mastodon_cli.account_statuses( me_id, exclude_replies=True) min_id = None max_id = None # first time run # init min_id for status in me_timeline: if not status['reblog'] and status['visibility'] == 'public': min_id = status['id'] logger.info(status['content']) break # if is in already posted status posted_ids = [] # if file not exist, create it with open('last_id.txt', 'a+') as f: f.seek(0) # read lines posted_ids = f.readlines() # remove \n posted_ids = [x.strip() for x in posted_ids] #print(posted_ids) if str(min_id) not in posted_ids: content = status['content'].replace('
', '\n') content = re.sub('<.*?>', '', content) post_fanfou(fanfou_cookie, fanfou_token, content, status['media_attachments']) # write to file with open('last_id.txt', 'a') as f: f.write(str(min_id) + '\n') posted_ids.append(min_id) logger.info('write toot') time.sleep(10) while True: requests.get('https://up.tunpok.com/api/push/sQLwSpeiUH?msg=OK&ping=') # get new status me_timeline = mastodon_cli.account_statuses( me_id, exclude_replies=True, min_id=min_id) # if no new status if not me_timeline: time.sleep(10) continue statuses = [] for status in me_timeline: statuses.append(status) statuses.reverse() for status in statuses: if not status['reblog'] and status['visibility'] == 'public': if str(status['id']) not in posted_ids: content = status['content'].replace('
', '\n') content = re.sub('<.*?>', '', content) logger.info(status['id'] + ' ' + content) post_fanfou(fanfou_cookie, fanfou_token, content, status['media_attachments']) with open('last_id.txt', 'a') as f: f.write(str(status['id']) + '\n') posted_ids.append(status['id']) logger.info('write toot %s', status['content']) time.sleep(10) # update min_id if statuses: min_id = statuses[-1]['id'] if __name__ == '__main__': cookie = 'u=stokesia; uuid=24edfe42401cbb03cf1f.1676446846.21; PHPSESSID=d4l9m3tp09gasff6r3u8mv9p12; m=yvettezhong%40gmail.com; tgw_l7_route=f174d6f255742a6ee2e9a07cf9cca0fa' token = '742ac481' m_ins = 'https://nofan.xyz' m_token = 'C5zKq2Mf54E7gaSDe47Bds080ySpsdvaykZBAtt5jvo' run(m_ins, m_token, cookie, token)