#!/usr/bin/env python # --coding:utf-8-- import os import sys os.environ.setdefault("DJANGO_SETTINGS_MODULE", "dsite.settings") sys.path.insert(0, '../') sys.path.insert(0, './') from django.core.wsgi import get_wsgi_application get_wsgi_application() from http.server import BaseHTTPRequestHandler, HTTPServer import json from urllib import request import hashlib import base64 from Crypto.Cipher import AES from mastodon import Mastodon import logging import redis import requests import time import subprocess from bs4 import BeautifulSoup import recipe.models from utils import const APP_VERIFICATION_TOKEN = 'uKQQiOVMYg2cTgrjkyBmodrHTUaCXzG3' APP_ID = 'cli_a115fe8b83f9100c' APP_SECRET = 'yuSQenId0VfvwdZ3qL9wMd8FpCMEUL0u' ENCRYPT_KEY = '4XfjcA5xou3pztBD4g5V7dgHtr0BBYDE' EVENT_TYPE = ['im.message.receive_v1'] ADD_GROUP_NAME = True KEDAI_ID = '107263380636355825' logging.basicConfig(filename='/root/develop/log/dodo.log', level=logging.INFO) logger = logging.getLogger('/root/develop/log/dodo.log') mastodon_cli = Mastodon( access_token = 'Ug_bUMWCk3RLamOnqYIytmeB0nO6aNfjdmf06mAj2bE', api_base_url = 'https://nofan.xyz' ) pool = redis.ConnectionPool(host='localhost', port=6379, decode_responses=True) redis_cli = redis.Redis(host='localhost', port=6379, decode_responses=True) class AESCipher(object): def __init__(self, key): self.bs = AES.block_size self.key=hashlib.sha256(AESCipher.str_to_bytes(key)).digest() @staticmethod def str_to_bytes(data): u_type = type(b"".decode('utf8')) if isinstance(data, u_type): return data.encode('utf8') return data @staticmethod def _unpad(s): return s[:-ord(s[len(s) - 1:])] def decrypt(self, enc): iv = enc[:AES.block_size] cipher = AES.new(self.key, AES.MODE_CBC, iv) return self._unpad(cipher.decrypt(enc[AES.block_size:])) def decrypt_string(self, enc): enc = base64.b64decode(enc) return self.decrypt(enc).decode('utf8') def get_tenant_access_token(): # 获取token token = redis_cli.get('tenant_access_token_%s' % APP_ID) if token: return token url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal/" headers = { "Content-Type": "application/json" } req_body = { "app_id": APP_ID, "app_secret": APP_SECRET } data = bytes(json.dumps(req_body), encoding='utf8') req = request.Request(url=url, data=data, headers=headers, method='POST') try: response = request.urlopen(req) except Exception as e: logger.error('get tenant token error: %s', e.read().decode()) return "" rsp_body = response.read().decode('utf-8') rsp_dict = json.loads(rsp_body) code = rsp_dict.get("code", -1) if code != 0: logger.error("get tenant_access_token error, code =%s", code) return "" token = rsp_dict.get("tenant_access_token", "") redis_cli.set('tenant_access_token_%s' % APP_ID, rsp_dict.get("tenant_access_token", ""), ex=60*30) return token def get_group_name(chat_id): group_name = redis_cli.get('group_name_%s' % chat_id) if not group_name: url = "https://open.feishu.cn/open-apis/im/v1/chats/" headers = { "Content-Type": "application/json", "Authorization": "Bearer " + get_tenant_access_token() } try: resp = requests.get(url+chat_id, headers=headers) resp_data = resp.json() code = resp_data.get("code", -1) if code == 0: group_name = resp_data.get('data', {}).get('name') redis_cli.set('group_name_%s' % chat_id, group_name, ex=60*60*24) except: # todo: log return return group_name class RequestHandler(BaseHTTPRequestHandler): def do_GET(self): """Serve a GET request.""" self.response("") def do_POST(self): # 解析请求 body req_body = self.rfile.read(int(self.headers['content-length'])) obj = json.loads(req_body.decode("utf-8")) cipher = AESCipher(ENCRYPT_KEY) obj = json.loads(cipher.decrypt_string(obj['encrypt'])) logger.info('lark request body: %s', obj) # 校验 verification token 是否匹配,token 不匹配说明该回调并非来自开发平台 token = obj.get("token", "") if not token: token = obj.get('header', {}).get('token', '') if token != APP_VERIFICATION_TOKEN: logger.error("verification token not match, token =%s", token) self.response("") return # 根据 type 处理不同类型事件 type = obj.get("type", "") if not type: type = obj.get('header', {}).get('event_type') if "url_verification" == type: # 验证请求 URL 是否有效 self.handle_request_url_verify(obj) elif type in EVENT_TYPE: # 事件回调 # 获取事件内容和类型,并进行相应处理,此处只关注给机器人推送的消息事件 event_id = obj.get('header', {}).get('event_id', '') # 重复收到的事件不处理 if event_id and redis_cli.get(event_id): self.response("") return event = obj.get("event") if event.get("message"): self.handle_message(event, event_id) return return def handle_request_url_verify(self, post_obj): # 原样返回 challenge 字段内容 challenge = post_obj.get("challenge", "") rsp = {'challenge': challenge} self.response(json.dumps(rsp)) return def handle_message(self, event, event_id=None): # 此处只处理 text 类型消息,其他类型消息忽略 msg = event.get('message', {}) msg_type = msg.get("message_type", "") if msg_type == "text": # 调用发消息 API 之前,先要获取 API 调用凭证:tenant_access_token access_token = get_tenant_access_token() if access_token == "": self.response("") return # 机器人回复收到的消息 text = json.loads(msg.get('content')).get('text') orig_text = text if msg.get('chat_type') == 'group' and msg.get('mentions'): open_id = {"open_chat_id": msg.get("chat_id")} for mention in msg.get('mentions'): text = text.replace(mention['key'], '') text = text.lstrip() orig_text = text if ADD_GROUP_NAME: group_name = get_group_name(msg.get("chat_id")) text = '%s #%s' % (text, group_name) else: open_id = {"open_id": event.get("sender", {}).get( 'sender_id', {}).get('open_id')} self.response("") if orig_text.startswith('/'): redis_cli.set(event_id, int(time.time()), ex=60*60*7) if orig_text not in ['/last', '/del']: flag = False for action_ in ['/deploy ', '/菜谱 ']: if orig_text.startswith(action_): flag = True break if not flag: self.msg_compoment(access_token, open_id, '指令错误') return if orig_text == '/last': try: statuses = mastodon_cli.account_statuses(KEDAI_ID, limit=1) s_text = BeautifulSoup(statuses[0]['content'], 'html.parser') self.msg_compoment(access_token, open_id, s_text.get_text('')) except Exception as exc: logger.error('operation error: %s', str(exc)) elif orig_text == '/del': try: statuses = mastodon_cli.account_statuses(KEDAI_ID, limit=1) mastodon_cli.status_delete(statuses[0]['id']) s_text = BeautifulSoup(statuses[0]['content'], 'html.parser') self.msg_compoment(access_token, open_id, '已删除: ' + s_text.get_text('')) except Exception as exc: logger.error('operation error: %s', str(exc)) elif orig_text.startswith('/deploy '): site_ = orig_text.split('/deploy ')[1] if site_ == 'dsite': self.msg_compoment(access_token, open_id, '🚧 %s 开始部署 🚧' % site_) subprocess.call("/root/deploy/dsite_prepare.sh") subprocess.run(["supervisorctl", "restart", "dsite"]) self.msg_compoment(access_token, open_id, '🎉 %s 部署成功 🎉' % site_) else: self.msg_compoment(access_token, open_id, '⚠️ %s 不存在 ⚠️' % site_) return elif orig_text.startswith('/菜谱 '): content = orig_text.split('/菜谱 ')[1] recipe_ = recipe.models.Recipe.create_from_str(content) if recipe_: self.msg_compoment(access_token, open_id, None, const.LARK_WEBHOOK_MSG_TYPE_INTERACTIVE, recipe_.construct_lart_card()) else: self.msg_compoment(access_token, open_id, '⚠️ 创建失败 ⚠️') return try: toot_resp = mastodon_cli.status_post(text) if toot_resp.get('id'): self.msg_compoment(access_token, open_id, '📟 dodo 📟') redis_cli.set(event_id, int(time.time()), ex=60*60*7) else: self.msg_compoment(access_token, open_id, """⚠️ didi ⚠️ %s """ % json.loads(toot_resp)) except Exception as exc: logger.error('send toot error: %s', str(exc)) return elif msg_type == "image": self.response("") return def response(self, body): self.send_response(200) self.send_header('Content-Type', 'application/json') self.end_headers() self.wfile.write(body.encode()) def send_message(self, token, open_id, text, msg_type=None, content=None): url = "https://open.feishu.cn/open-apis/message/v4/send/" headers = { "Content-Type": "application/json", "Authorization": "Bearer " + token } if not msg_type: msg_type = const.LARK_WEBHOOK_MSG_TYPE_TEXT req_body = { "msg_type": msg_type } if msg_type == const.LARK_WEBHOOK_MSG_TYPE_TEXT: req_body['content'] = {'text': text} elif msg_type == const.LARK_WEBHOOK_MSG_TYPE_INTERACTIVE: req_body['card'] = content req_body = dict(req_body, **open_id) # 根据open_id判断返回域 data = bytes(json.dumps(req_body), encoding='utf8') req = request.Request(url=url, data=data, headers=headers, method='POST') try: response = request.urlopen(req) except Exception as e: logger.error('send message error: %s', e.read().decode()) return rsp_body = response.read().decode('utf-8') rsp_dict = json.loads(rsp_body) code = rsp_dict.get("code", -1) if code != 0: logger.error("send message error, code = %s, msg =%s", code, rsp_dict.get("msg", "")) def msg_compoment(self, token, open_id, text, msg_type=None, content=None): self.send_message(token, open_id, text, msg_type, content) def run(): port = 5000 server_address = ('', port) httpd = HTTPServer(server_address, RequestHandler) logger.info("start...") httpd.serve_forever() if __name__ == '__main__': run()