dsite/scripts/dodo.py
Ching 047bd00545 feat(dodo.py): [M] 增加上一条和删除上一条嘟嘟的功能
[M] 增加上一条和删除上一条嘟嘟的功能

Signed-off-by: Ching <loooching@gmail.com>
2022-01-07 11:59:11 +08:00

268 lines
9.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# --coding:utf-8--
from http.server import BaseHTTPRequestHandler, HTTPServer
import json
from urllib import request
import hashlib
import base64
from Crypto.Cipher import AES
from mastodon import Mastodon
import logging
import redis
import requests
import time
APP_VERIFICATION_TOKEN = 'uKQQiOVMYg2cTgrjkyBmodrHTUaCXzG3'
APP_ID = 'cli_a115fe8b83f9100c'
APP_SECRET = 'yuSQenId0VfvwdZ3qL9wMd8FpCMEUL0u'
ENCRYPT_KEY = '4XfjcA5xou3pztBD4g5V7dgHtr0BBYDE'
EVENT_TYPE = ['im.message.receive_v1']
ADD_GROUP_NAME = True
KEDAI_ID = '107263380636355825'
logging.basicConfig(filename='/root/develop/log/dodo.log', level=logging.INFO)
logger = logging.getLogger('/root/develop/log/dodo.log')
mastodon = Mastodon(
access_token = 'Ug_bUMWCk3RLamOnqYIytmeB0nO6aNfjdmf06mAj2bE',
api_base_url = 'https://nofan.xyz'
)
pool = redis.ConnectionPool(host='localhost', port=6379, decode_responses=True)
redis_cli = redis.Redis(host='localhost', port=6379, decode_responses=True)
class AESCipher(object):
def __init__(self, key):
self.bs = AES.block_size
self.key=hashlib.sha256(AESCipher.str_to_bytes(key)).digest()
@staticmethod
def str_to_bytes(data):
u_type = type(b"".decode('utf8'))
if isinstance(data, u_type):
return data.encode('utf8')
return data
@staticmethod
def _unpad(s):
return s[:-ord(s[len(s) - 1:])]
def decrypt(self, enc):
iv = enc[:AES.block_size]
cipher = AES.new(self.key, AES.MODE_CBC, iv)
return self._unpad(cipher.decrypt(enc[AES.block_size:]))
def decrypt_string(self, enc):
enc = base64.b64decode(enc)
return self.decrypt(enc).decode('utf8')
def get_tenant_access_token(): # 获取token
token = redis_cli.get('tenant_access_token_%s' % APP_ID)
if token:
return token
url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal/"
headers = {
"Content-Type": "application/json"
}
req_body = {
"app_id": APP_ID,
"app_secret": APP_SECRET
}
data = bytes(json.dumps(req_body), encoding='utf8')
req = request.Request(url=url, data=data, headers=headers, method='POST')
try:
response = request.urlopen(req)
except Exception as e:
logger.error('get tenant token error: %s', e.read().decode())
return ""
rsp_body = response.read().decode('utf-8')
rsp_dict = json.loads(rsp_body)
code = rsp_dict.get("code", -1)
if code != 0:
logger.error("get tenant_access_token error, code =%s", code)
return ""
token = redis_cli.set('tenant_access_token_%s' % APP_ID,
rsp_dict.get("tenant_access_token", ""),
ex=60*30)
return token
def get_group_name(chat_id):
group_name = redis_cli.get('group_name_%s' % chat_id)
if not group_name:
url = "https://open.feishu.cn/open-apis/im/v1/chats/"
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer " + get_tenant_access_token()
}
try:
resp = requests.get(url+chat_id, headers=headers)
resp_data = resp.json()
code = resp_data.get("code", -1)
if code == 0:
group_name = resp_data.get('data', {}).get('name')
redis_cli.set('group_name_%s' % chat_id,
group_name,
ex=60*60*24)
except:
# todo: log
return
return group_name
class RequestHandler(BaseHTTPRequestHandler):
def do_POST(self):
# 解析请求 body
req_body = self.rfile.read(int(self.headers['content-length']))
obj = json.loads(req_body.decode("utf-8"))
cipher = AESCipher(ENCRYPT_KEY)
obj = json.loads(cipher.decrypt_string(obj['encrypt']))
logger.info('lark request body: %s', obj)
# 校验 verification token 是否匹配token 不匹配说明该回调并非来自开发平台
token = obj.get("token", "")
if not token:
token = obj.get('header', {}).get('token', '')
if token != APP_VERIFICATION_TOKEN:
logger.error("verification token not match, token =%s", token)
self.response("")
return
# 根据 type 处理不同类型事件
type = obj.get("type", "")
if not type:
type = obj.get('header', {}).get('event_type')
if "url_verification" == type: # 验证请求 URL 是否有效
self.handle_request_url_verify(obj)
elif type in EVENT_TYPE: # 事件回调
# 获取事件内容和类型,并进行相应处理,此处只关注给机器人推送的消息事件
event_id = obj.get('header', {}).get('event_id', '')
# 重复收到的事件不处理
if event_id and redis_cli.get(event_id):
self.response("")
return
event = obj.get("event")
if event.get("message"):
self.handle_message(event, event_id)
return
return
def handle_request_url_verify(self, post_obj):
# 原样返回 challenge 字段内容
challenge = post_obj.get("challenge", "")
rsp = {'challenge': challenge}
self.response(json.dumps(rsp))
return
def handle_message(self, event, event_id=None):
# 此处只处理 text 类型消息,其他类型消息忽略
msg = event.get('message', {})
msg_type = msg.get("message_type", "")
if msg_type == "text":
# 调用发消息 API 之前,先要获取 API 调用凭证tenant_access_token
access_token = get_tenant_access_token()
if access_token == "":
self.response("")
return
# 机器人回复收到的消息
text = json.loads(msg.get('content')).get('text')
orig_text = text
if msg.get('chat_type') == 'group' and msg.get('mentions'):
open_id = {"open_chat_id": msg.get("chat_id")}
for mention in msg.get('mentions'):
text = text.replace(mention['key'], '')
text = text.lstrip()
orig_text = text
if ADD_GROUP_NAME:
group_name = get_group_name(msg.get("chat_id"))
text = '%s #%s' % (text, group_name)
else:
open_id = {"open_id": event.get("sender", {}).get(
'sender_id', {}).get('open_id')}
self.response("")
if orig_text == '/last':
try:
statuses = mastodon.account_statuses(KEDAI_ID, limit=1)
self.msg_compoment(access_token, open_id,
statuses[0]['content'])
except Exception as exc:
logger.error('operation error: %s', str(exc))
elif orig_text == '/del':
try:
statuses = mastodon.account_statuses(KEDAI_ID, limit=1)
Mastodon.status_delete(statuses[0]['id'])
self.msg_compoment(access_token, open_id,
'已删除: ' + statuses[0]['content'])
except Exception as exc:
logger.error('operation error: %s', str(exc))
try:
toot_resp = mastodon.status_post(text)
if toot_resp.get('id'):
self.msg_compoment(access_token, open_id, '📟 dodo 📟')
redis_cli.set(event_id, int(time.time()), ex=60*60*7)
else:
self.msg_compoment(access_token, open_id, """⚠️ didi ⚠️
%s
""" % json.loads(toot_resp))
except Exception as exc:
logger.error('send toot error: %s', str(exc))
return
elif msg_type == "image":
self.response("")
return
def response(self, body):
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(body.encode())
def send_message(self, token, open_id, text):
url = "https://open.feishu.cn/open-apis/message/v4/send/"
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer " + token
}
req_body = {
"msg_type": "text",
"content": {
"text": text
}
}
req_body = dict(req_body, **open_id) # 根据open_id判断返回域
data = bytes(json.dumps(req_body), encoding='utf8')
req = request.Request(url=url, data=data, headers=headers, method='POST')
try:
response = request.urlopen(req)
except Exception as e:
logger.error('send message error: %s', e.read().decode())
return
rsp_body = response.read().decode('utf-8')
rsp_dict = json.loads(rsp_body)
code = rsp_dict.get("code", -1)
if code != 0:
logger.error("send message error, code = %s, msg =%s",
code,
rsp_dict.get("msg", ""))
def msg_compoment(self, token, open_id, text):
self.send_message(token, open_id, text)
def run():
port = 5000
server_address = ('', port)
httpd = HTTPServer(server_address, RequestHandler)
logger.info("start...")
httpd.serve_forever()
if __name__ == '__main__':
run()