import flickrapi from mastodon import Mastodon from loguru import logger import peewee import os import datetime import requests import time logger.add('flickr2mastodon.log') db_path = 'photo.db' photo_db = peewee.SqliteDatabase(db_path) # if not os.path.exists('photo.db'), init db class Photo(peewee.Model): # 'photo': [{'id': '53484260320', # 'owner': '78663165@N06', # 'secret': 'a253c16b8c', # 'server': '65535', # 'farm': 66, # 'title': '猫猫们', # 'ispublic': 1, # 'isfriend': 0, # 'isfamily': 0},] id = peewee.IntegerField(primary_key=True) owner = peewee.CharField() secret = peewee.CharField() server = peewee.IntegerField() farm = peewee.IntegerField() title = peewee.CharField() ispublic = peewee.IntegerField() isfriend = peewee.IntegerField() isfamily = peewee.IntegerField() # synced = peewee.BooleanField(default=False) status = peewee.CharField(null=True) # unsynced, syncing, synced created_at = peewee.DateTimeField(default=datetime.datetime.now) synced_at = peewee.DateTimeField(null=True) class Meta: database = photo_db def init_db(): photo_db.connect() photo_db.create_tables([Photo]) photo_db.close() if not os.path.exists(db_path): init_db() m_instance = 'https://nofan.xyz' m_token = 'Ug_bUMWCk3RLamOnqYIytmeB0nO6aNfjdmf06mAj2bE' f_api_key = 'c743ace0530826568168d2cfeef380c3' f_api_secret = '8401a4e46c40ce0b' flickr = flickrapi.FlickrAPI(f_api_key, f_api_secret, format='parsed-json') user_id = '78663165@N06' def get_flickr_photo_page_url(photo, short=False): if not short: return 'https://www.flickr.com/photos/{}/{}/'.format(photo.owner, photo.id) else: short_url = base58_encode(photo.id) return 'https://flic.kr/p/{}'.format(short_url) def get_flickr_photo_url(photo, img_size='b'): # 後綴 類別 長邊 (像素) 備註 # s 縮圖 75 經裁剪的正方形 # q 縮圖 150 經裁剪的正方形 # t 縮圖 100 # m 小 240 # n 小 320 # w 小 400 # (none) 中 500 # z 中 640 # c 中 800 # b 大 1024 # h 大 1600 有獨有的密鑰;相片擁有人可以禁止顯示 # k 大 2048 有獨有的密鑰;相片擁有人可以禁止顯示 # 3k 特大 3072 有獨有的密鑰;相片擁有人可以禁止顯示 # 4k 特大 4096 有獨有的密鑰;相片擁有人可以禁止顯示 # f 特大 4096 有獨有的密鑰;相片擁有人可以禁止顯示;僅出現於長闊比例為 2:1 的相片 # 5k 特大 5120 有獨有的密鑰;相片擁有人可以禁止顯示 # 6k 特大 6144 有獨有的密鑰;相片擁有人可以禁止顯示 # o 原圖 任意 有獨有的密鑰;相片擁有人可以禁止顯示;檔案具有完整的 EXIF 數據;檔案可能無法旋轉;檔案可以使用任意的副檔名 return f"https://live.staticflickr.com/{photo.server}/{photo.id}_{photo.secret}_{img_size}.jpg" def base58_encode(num): alphabet = '123456789abcdefghijkmnopqrstuvwxyzABCDEFGHJKLMNPQRSTUVWXYZ' base_count = len(alphabet) encode = '' if (num < 0): return '' while (num >= base_count): mod = num % base_count encode = alphabet[mod] + encode num = num // base_count if (num): encode = alphabet[num] + encode return encode # get all photos user_photos = flickr.people.getPublicPhotos(user_id=user_id) if user_photos.get('stat') != 'ok': logger.error('Error: {}'.format(user_photos.get('message'))) exit() for photo in user_photos['photos']['photo']: if not Photo.select().where(Photo.id == photo['id']).exists(): Photo.create( id=photo['id'], owner=photo['owner'], secret=photo['secret'], server=photo['server'], farm=photo['farm'], title=photo['title'], ispublic=photo['ispublic'], isfriend=photo['isfriend'], isfamily=photo['isfamily'], status='unsynced', ) logger.info('New photo: {}'.format(photo['title'])) # Mastodon mastodon_client = Mastodon( access_token=m_token, api_base_url=m_instance ) def upload_photo_to_mastodon(photo_url): for x in range(3): try: # download photo from flickr response = requests.get(photo_url) img = mastodon_client.media_post(media_file=response.content, mime_type='image/jpeg') break except Exception as e: logger.error(e) logger.error('Error: {}'.format(photo_url)) time.sleep(1) continue return img['id'] # get all un-synced photos Photo.update(status='syncing').where(Photo.status == 'unsynced').execute() syncing_photos = Photo.select().where(Photo.status == 'syncing').order_by(Photo.created_at.asc()) # get titles unsynced_titles = set([p.title for p in syncing_photos]) for title in unsynced_titles: # get unsynced photos by title photos = Photo.select().where(Photo.title == title, Photo.synced == False).order_by(Photo.created_at.asc()) batches = [photos[i:i+4] for i in range(0, len(photos), 4)] for i, batch in enumerate(batches): idx = i + 1 if len(photos) > 4: status = f'{title} ({idx}/{len(batches)})' else: status = title media_ids = [] for p in batch: # upload photo to mastodon photo_url = get_flickr_photo_url(p) short_url = get_flickr_photo_page_url(p, short=True) time.sleep(3) img_id = upload_photo_to_mastodon(photo_url) if img_id: media_ids.append(img_id) status += f'\n{short_url}' else: logger.error('Error: {}'.format(p.title)) Photo.update(status='unsynced').where(Photo.id == p.id).execute() # post to mastodon mastodon_client.status_post(status=status, media_ids=media_ids) logger.info('Post: {}'.format(status)) # update synced for p in batch: p.synced = True p.synced_at = datetime.datetime.now() p.save()