dsite/scripts/dodo.py
Ching a0fa3ac98c fix(dodo): 修复指令执行后当成嘟嘟发送的问题
修复指令执行后当成嘟嘟发送的问题

Signed-off-by: Ching <loooching@gmail.com>
2022-01-21 14:22:22 +08:00

319 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# --coding:utf-8--
import os
import sys
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "dsite.settings")
sys.path.insert(0, '../')
sys.path.insert(0, './')
from django.core.wsgi import get_wsgi_application
get_wsgi_application()
from http.server import BaseHTTPRequestHandler, HTTPServer
import json
from urllib import request
import hashlib
import base64
from Crypto.Cipher import AES
from mastodon import Mastodon
import logging
import redis
import requests
import time
import subprocess
from bs4 import BeautifulSoup
import recipe.models
from utils import const
APP_VERIFICATION_TOKEN = 'uKQQiOVMYg2cTgrjkyBmodrHTUaCXzG3'
APP_ID = 'cli_a115fe8b83f9100c'
APP_SECRET = 'yuSQenId0VfvwdZ3qL9wMd8FpCMEUL0u'
ENCRYPT_KEY = '4XfjcA5xou3pztBD4g5V7dgHtr0BBYDE'
EVENT_TYPE = ['im.message.receive_v1']
ADD_GROUP_NAME = True
KEDAI_ID = '107263380636355825'
logging.basicConfig(filename='/root/develop/log/dodo.log', level=logging.INFO)
logger = logging.getLogger('/root/develop/log/dodo.log')
mastodon_cli = Mastodon(
access_token = 'Ug_bUMWCk3RLamOnqYIytmeB0nO6aNfjdmf06mAj2bE',
api_base_url = 'https://nofan.xyz'
)
pool = redis.ConnectionPool(host='localhost', port=6379, decode_responses=True)
redis_cli = redis.Redis(host='localhost', port=6379, decode_responses=True)
class AESCipher(object):
def __init__(self, key):
self.bs = AES.block_size
self.key=hashlib.sha256(AESCipher.str_to_bytes(key)).digest()
@staticmethod
def str_to_bytes(data):
u_type = type(b"".decode('utf8'))
if isinstance(data, u_type):
return data.encode('utf8')
return data
@staticmethod
def _unpad(s):
return s[:-ord(s[len(s) - 1:])]
def decrypt(self, enc):
iv = enc[:AES.block_size]
cipher = AES.new(self.key, AES.MODE_CBC, iv)
return self._unpad(cipher.decrypt(enc[AES.block_size:]))
def decrypt_string(self, enc):
enc = base64.b64decode(enc)
return self.decrypt(enc).decode('utf8')
def get_tenant_access_token(): # 获取token
token = redis_cli.get('tenant_access_token_%s' % APP_ID)
if token:
return token
url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal/"
headers = {
"Content-Type": "application/json"
}
req_body = {
"app_id": APP_ID,
"app_secret": APP_SECRET
}
data = bytes(json.dumps(req_body), encoding='utf8')
req = request.Request(url=url, data=data, headers=headers, method='POST')
try:
response = request.urlopen(req)
except Exception as e:
logger.error('get tenant token error: %s', e.read().decode())
return ""
rsp_body = response.read().decode('utf-8')
rsp_dict = json.loads(rsp_body)
code = rsp_dict.get("code", -1)
if code != 0:
logger.error("get tenant_access_token error, code =%s", code)
return ""
token = rsp_dict.get("tenant_access_token", "")
redis_cli.set('tenant_access_token_%s' % APP_ID,
rsp_dict.get("tenant_access_token", ""),
ex=60*30)
return token
def get_group_name(chat_id):
group_name = redis_cli.get('group_name_%s' % chat_id)
if not group_name:
url = "https://open.feishu.cn/open-apis/im/v1/chats/"
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer " + get_tenant_access_token()
}
try:
resp = requests.get(url+chat_id, headers=headers)
resp_data = resp.json()
code = resp_data.get("code", -1)
if code == 0:
group_name = resp_data.get('data', {}).get('name')
redis_cli.set('group_name_%s' % chat_id,
group_name,
ex=60*60*24)
except:
# todo: log
return
return group_name
class RequestHandler(BaseHTTPRequestHandler):
def do_GET(self):
"""Serve a GET request."""
self.response("")
def do_POST(self):
# 解析请求 body
req_body = self.rfile.read(int(self.headers['content-length']))
obj = json.loads(req_body.decode("utf-8"))
cipher = AESCipher(ENCRYPT_KEY)
obj = json.loads(cipher.decrypt_string(obj['encrypt']))
logger.info('lark request body: %s', obj)
# 校验 verification token 是否匹配token 不匹配说明该回调并非来自开发平台
token = obj.get("token", "")
if not token:
token = obj.get('header', {}).get('token', '')
if token != APP_VERIFICATION_TOKEN:
logger.error("verification token not match, token =%s", token)
self.response("")
return
# 根据 type 处理不同类型事件
type = obj.get("type", "")
if not type:
type = obj.get('header', {}).get('event_type')
if "url_verification" == type: # 验证请求 URL 是否有效
self.handle_request_url_verify(obj)
elif type in EVENT_TYPE: # 事件回调
# 获取事件内容和类型,并进行相应处理,此处只关注给机器人推送的消息事件
event_id = obj.get('header', {}).get('event_id', '')
# 重复收到的事件不处理
if event_id and redis_cli.get(event_id):
self.response("")
return
event = obj.get("event")
if event.get("message"):
self.handle_message(event, event_id)
return
return
def handle_request_url_verify(self, post_obj):
# 原样返回 challenge 字段内容
challenge = post_obj.get("challenge", "")
rsp = {'challenge': challenge}
self.response(json.dumps(rsp))
return
def handle_message(self, event, event_id=None):
# 此处只处理 text 类型消息,其他类型消息忽略
msg = event.get('message', {})
msg_type = msg.get("message_type", "")
if msg_type == "text":
# 调用发消息 API 之前,先要获取 API 调用凭证tenant_access_token
access_token = get_tenant_access_token()
if access_token == "":
self.response("")
return
# 机器人回复收到的消息
text = json.loads(msg.get('content')).get('text')
orig_text = text
if msg.get('chat_type') == 'group' and msg.get('mentions'):
open_id = {"open_chat_id": msg.get("chat_id")}
for mention in msg.get('mentions'):
text = text.replace(mention['key'], '')
text = text.lstrip()
orig_text = text
if ADD_GROUP_NAME:
group_name = get_group_name(msg.get("chat_id"))
text = '%s #%s' % (text, group_name)
else:
open_id = {"open_id": event.get("sender", {}).get(
'sender_id', {}).get('open_id')}
self.response("")
if orig_text.startswith('/'):
redis_cli.set(event_id, int(time.time()), ex=60*60*7)
if orig_text not in ['/last', '/del']:
flag = False
for action_ in ['/deploy ', '/菜谱 ']:
if orig_text.startswith(action_):
flag = True
break
if not flag:
self.msg_compoment(access_token, open_id, '指令错误')
return
if orig_text == '/last':
try:
statuses = mastodon_cli.account_statuses(KEDAI_ID, limit=1)
s_text = BeautifulSoup(statuses[0]['content'], 'html.parser')
self.msg_compoment(access_token, open_id,
s_text.get_text(''))
except Exception as exc:
logger.error('operation error: %s', str(exc))
elif orig_text == '/del':
try:
statuses = mastodon_cli.account_statuses(KEDAI_ID, limit=1)
mastodon_cli.status_delete(statuses[0]['id'])
s_text = BeautifulSoup(statuses[0]['content'], 'html.parser')
self.msg_compoment(access_token, open_id,
'已删除: ' + s_text.get_text(''))
except Exception as exc:
logger.error('operation error: %s', str(exc))
elif orig_text.startswith('/deploy '):
site_ = orig_text.split('/deploy ')[1]
if site_ == 'dsite':
self.msg_compoment(access_token, open_id, '🚧 %s 开始部署 🚧' % site_)
subprocess.call("/root/deploy/dsite_prepare.sh")
subprocess.run(["supervisorctl", "restart", "dsite"])
self.msg_compoment(access_token, open_id, '🎉 %s 部署成功 🎉' % site_)
else:
self.msg_compoment(access_token, open_id, '⚠️ %s 不存在 ⚠️' % site_)
elif orig_text.startswith('/菜谱 '):
content = orig_text.split('/菜谱 ')[1]
recipe_ = recipe.models.Recipe.create_from_str(content)
if recipe_:
self.msg_compoment(access_token, open_id, None,
const.LARK_WEBHOOK_MSG_TYPE_INTERACTIVE,
recipe_.construct_lart_card())
else:
self.msg_compoment(access_token, open_id, '⚠️ 创建失败 ⚠️')
return
try:
toot_resp = mastodon_cli.status_post(text)
if toot_resp.get('id'):
self.msg_compoment(access_token, open_id, '📟 dodo 📟')
redis_cli.set(event_id, int(time.time()), ex=60*60*7)
else:
self.msg_compoment(access_token, open_id, """⚠️ didi ⚠️
%s
""" % json.loads(toot_resp))
except Exception as exc:
logger.error('send toot error: %s', str(exc))
return
elif msg_type == "image":
self.response("")
return
def response(self, body):
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(body.encode())
def send_message(self, token, open_id, text, msg_type=None, content=None):
url = "https://open.feishu.cn/open-apis/message/v4/send/"
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer " + token
}
if not msg_type:
msg_type = const.LARK_WEBHOOK_MSG_TYPE_TEXT
req_body = {
"msg_type": msg_type
}
if msg_type == const.LARK_WEBHOOK_MSG_TYPE_TEXT:
req_body['content'] = {'text': text}
elif msg_type == const.LARK_WEBHOOK_MSG_TYPE_INTERACTIVE:
req_body['card'] = content
req_body = dict(req_body, **open_id) # 根据open_id判断返回域
data = bytes(json.dumps(req_body), encoding='utf8')
req = request.Request(url=url, data=data, headers=headers, method='POST')
try:
response = request.urlopen(req)
except Exception as e:
logger.error('send message error: %s', e.read().decode())
return
rsp_body = response.read().decode('utf-8')
rsp_dict = json.loads(rsp_body)
code = rsp_dict.get("code", -1)
if code != 0:
logger.error("send message error, code = %s, msg =%s",
code,
rsp_dict.get("msg", ""))
def msg_compoment(self, token, open_id, text, msg_type=None, content=None):
self.send_message(token, open_id, text, msg_type, content)
def run():
port = 5000
server_address = ('', port)
httpd = HTTPServer(server_address, RequestHandler)
logger.info("start...")
httpd.serve_forever()
if __name__ == '__main__':
run()