195 lines
5.9 KiB
Python
195 lines
5.9 KiB
Python
import flickrapi
|
|
from mastodon import Mastodon
|
|
from loguru import logger
|
|
import peewee
|
|
import os
|
|
import datetime
|
|
import requests
|
|
import time
|
|
|
|
logger.add('flickr2mastodon.log')
|
|
db_path = 'photo.db'
|
|
photo_db = peewee.SqliteDatabase(db_path)
|
|
# if not os.path.exists('photo.db'), init db
|
|
class Photo(peewee.Model):
|
|
# 'photo': [{'id': '53484260320',
|
|
# 'owner': '78663165@N06',
|
|
# 'secret': 'a253c16b8c',
|
|
# 'server': '65535',
|
|
# 'farm': 66,
|
|
# 'title': '猫猫们',
|
|
# 'ispublic': 1,
|
|
# 'isfriend': 0,
|
|
# 'isfamily': 0},]
|
|
id = peewee.IntegerField(primary_key=True)
|
|
owner = peewee.CharField()
|
|
secret = peewee.CharField()
|
|
server = peewee.IntegerField()
|
|
farm = peewee.IntegerField()
|
|
title = peewee.CharField()
|
|
ispublic = peewee.IntegerField()
|
|
isfriend = peewee.IntegerField()
|
|
isfamily = peewee.IntegerField()
|
|
synced = peewee.BooleanField(default=False)
|
|
created_at = peewee.DateTimeField(default=datetime.datetime.now)
|
|
synced_at = peewee.DateTimeField(null=True)
|
|
|
|
class Meta:
|
|
database = photo_db
|
|
|
|
def init_db():
|
|
photo_db.connect()
|
|
photo_db.create_tables([Photo])
|
|
photo_db.close()
|
|
|
|
if not os.path.exists(db_path):
|
|
init_db()
|
|
|
|
|
|
m_instance = 'https://nofan.xyz'
|
|
m_token = 'Ug_bUMWCk3RLamOnqYIytmeB0nO6aNfjdmf06mAj2bE'
|
|
f_api_key = 'c743ace0530826568168d2cfeef380c3'
|
|
f_api_secret = '8401a4e46c40ce0b'
|
|
|
|
flickr = flickrapi.FlickrAPI(f_api_key, f_api_secret, format='parsed-json')
|
|
|
|
user_id = '78663165@N06'
|
|
|
|
def get_flickr_photo_page_url(photo, short=False):
|
|
if not short:
|
|
return 'https://www.flickr.com/photos/{}/{}/'.format(photo.owner, photo.id)
|
|
else:
|
|
short_url = base58_encode(photo.id)
|
|
return 'https://flic.kr/p/{}'.format(short_url)
|
|
|
|
|
|
def get_flickr_photo_url(photo, img_size='b'):
|
|
# 後綴 類別 長邊 (像素) 備註
|
|
# s 縮圖 75 經裁剪的正方形
|
|
# q 縮圖 150 經裁剪的正方形
|
|
# t 縮圖 100
|
|
# m 小 240
|
|
# n 小 320
|
|
# w 小 400
|
|
# (none) 中 500
|
|
# z 中 640
|
|
# c 中 800
|
|
# b 大 1024
|
|
# h 大 1600 有獨有的密鑰;相片擁有人可以禁止顯示
|
|
# k 大 2048 有獨有的密鑰;相片擁有人可以禁止顯示
|
|
# 3k 特大 3072 有獨有的密鑰;相片擁有人可以禁止顯示
|
|
# 4k 特大 4096 有獨有的密鑰;相片擁有人可以禁止顯示
|
|
# f 特大 4096 有獨有的密鑰;相片擁有人可以禁止顯示;僅出現於長闊比例為 2:1 的相片
|
|
# 5k 特大 5120 有獨有的密鑰;相片擁有人可以禁止顯示
|
|
# 6k 特大 6144 有獨有的密鑰;相片擁有人可以禁止顯示
|
|
# o 原圖 任意 有獨有的密鑰;相片擁有人可以禁止顯示;檔案具有完整的 EXIF 數據;檔案可能無法旋轉;檔案可以使用任意的副檔名
|
|
return f"https://live.staticflickr.com/{photo.server}/{photo.id}_{photo.secret}_{img_size}.jpg"
|
|
|
|
def base58_encode(num):
|
|
alphabet = '123456789abcdefghijkmnopqrstuvwxyzABCDEFGHJKLMNPQRSTUVWXYZ'
|
|
base_count = len(alphabet)
|
|
encode = ''
|
|
if (num < 0):
|
|
return ''
|
|
while (num >= base_count):
|
|
mod = num % base_count
|
|
encode = alphabet[mod] + encode
|
|
num = num // base_count
|
|
if (num):
|
|
encode = alphabet[num] + encode
|
|
return encode
|
|
|
|
|
|
# get all photos
|
|
user_photos = flickr.people.getPublicPhotos(user_id=user_id)
|
|
if user_photos.get('stat') != 'ok':
|
|
logger.error('Error: {}'.format(user_photos.get('message')))
|
|
exit()
|
|
|
|
for photo in user_photos['photos']['photo']:
|
|
if not Photo.select().where(Photo.id == photo['id']).exists():
|
|
Photo.create(
|
|
id=photo['id'],
|
|
owner=photo['owner'],
|
|
secret=photo['secret'],
|
|
server=photo['server'],
|
|
farm=photo['farm'],
|
|
title=photo['title'],
|
|
ispublic=photo['ispublic'],
|
|
isfriend=photo['isfriend'],
|
|
isfamily=photo['isfamily'],
|
|
)
|
|
logger.info('New photo: {}'.format(photo['title']))
|
|
|
|
# Mastodon
|
|
mastodon_client = Mastodon(
|
|
access_token=m_token,
|
|
api_base_url=m_instance
|
|
)
|
|
|
|
def upload_photo_to_mastodon(photo_url):
|
|
for x in range(3):
|
|
try:
|
|
# download photo from flickr
|
|
response = requests.get(photo_url)
|
|
img = mastodon_client.media_post(media_file=response.content, mime_type='image/jpeg')
|
|
break
|
|
except Exception as e:
|
|
logger.error(e)
|
|
logger.error('Error: {}'.format(photo_url))
|
|
time.sleep(1)
|
|
continue
|
|
return img['id']
|
|
|
|
|
|
# get all un-synced photos
|
|
unsynced_photos = Photo.select().where(Photo.synced == False).order_by(Photo.created_at.asc())
|
|
# get titles
|
|
unsynced_titles = set([p.title for p in unsynced_photos])
|
|
for title in unsynced_titles:
|
|
# get photos by title
|
|
photos = Photo.select().where(Photo.title == title).order_by(Photo.created_at.asc())
|
|
if len(photos) > 1:
|
|
batches = [photos[i:i+4] for i in range(0, len(photos), 4)]
|
|
for i, batch in enumerate(batches):
|
|
idx = i + 1
|
|
status = f'{title} ({idx}/{len(batches)}) \n'
|
|
media_ids = []
|
|
for p in batch:
|
|
# upload photo to mastodon
|
|
photo_url = get_flickr_photo_url(p)
|
|
short_url = get_flickr_photo_page_url(p, short=True)
|
|
time.sleep(3)
|
|
img_id = upload_photo_to_mastodon(photo_url)
|
|
if img_id:
|
|
media_ids.append(img_id)
|
|
status += f'{short_url} \n'
|
|
else:
|
|
logger.error('Error: {}'.format(p.title))
|
|
# post to mastodon
|
|
mastodon_client.status_post(status=status, media_ids=media_ids)
|
|
logger.info('Post: {}'.format(status))
|
|
# update synced
|
|
for p in batch:
|
|
p.synced = True
|
|
p.synced_at = datetime.datetime.now()
|
|
p.save()
|
|
else:
|
|
p = photos[0]
|
|
status = title
|
|
media_ids = []
|
|
img_id = upload_photo_to_mastodon(p)
|
|
if img_id:
|
|
media_ids.append(img_id)
|
|
short_url = get_flickr_photo_page_url(p, short=True)
|
|
status += f'\n{short_url}'
|
|
# post to mastodon
|
|
mastodon_client.status_post(status=status, media_ids=media_ids)
|
|
logger.info('Post: {}'.format(status))
|
|
# update synced
|
|
p.synced = True
|
|
p.synced_at = datetime.datetime.now()
|
|
p.save()
|
|
else:
|
|
logger.error('Error: {}'.format(p.title))
|