dsite/scripts/dodo.py
Ching 4a6d2f82e0 feat(scripts folder): [A]增加嘟嘟机脚本
[A]增加嘟嘟机脚本

Signed-off-by: Ching <loooching@gmail.com>
2022-01-05 11:28:02 +08:00

186 lines
6.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# --coding:utf-8--
from http.server import BaseHTTPRequestHandler, HTTPServer
import json
from urllib import request
import hashlib
import base64
from Crypto.Cipher import AES
# from utils import get_tenant_access_token, isreciept
# from Function import *
# from Private import APP_VERIFICATION_TOKEN
APP_VERIFICATION_TOKEN = 'uKQQiOVMYg2cTgrjkyBmodrHTUaCXzG3'
APP_ID = 'cli_a115fe8b83f9100c'
APP_SECRET = 'yuSQenId0VfvwdZ3qL9wMd8FpCMEUL0u'
ENCRYPT_KEY = '4XfjcA5xou3pztBD4g5V7dgHtr0BBYDE'
EVENT_TYPE = ['im.message.receive_v1']
class AESCipher(object):
def __init__(self, key):
self.bs = AES.block_size
self.key=hashlib.sha256(AESCipher.str_to_bytes(key)).digest()
@staticmethod
def str_to_bytes(data):
u_type = type(b"".decode('utf8'))
if isinstance(data, u_type):
return data.encode('utf8')
return data
@staticmethod
def _unpad(s):
return s[:-ord(s[len(s) - 1:])]
def decrypt(self, enc):
iv = enc[:AES.block_size]
cipher = AES.new(self.key, AES.MODE_CBC, iv)
return self._unpad(cipher.decrypt(enc[AES.block_size:]))
def decrypt_string(self, enc):
enc = base64.b64decode(enc)
return self.decrypt(enc).decode('utf8')
def get_tenant_access_token(): # 获取token
url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal/"
headers = {
"Content-Type": "application/json"
}
req_body = {
"app_id": APP_ID,
"app_secret": APP_SECRET
}
data = bytes(json.dumps(req_body), encoding='utf8')
req = request.Request(url=url, data=data, headers=headers, method='POST')
try:
response = request.urlopen(req)
except Exception as e:
print(e.read().decode())
return ""
rsp_body = response.read().decode('utf-8')
rsp_dict = json.loads(rsp_body)
code = rsp_dict.get("code", -1)
if code != 0:
print("get tenant_access_token error, code =", code)
return ""
return rsp_dict.get("tenant_access_token", "")
class RequestHandler(BaseHTTPRequestHandler):
def do_POST(self):
# 解析请求 body
req_body = self.rfile.read(int(self.headers['content-length']))
obj = json.loads(req_body.decode("utf-8"))
cipher = AESCipher(ENCRYPT_KEY)
obj = json.loads(cipher.decrypt_string(obj['encrypt']))
print(req_body)
print(obj)
# 校验 verification token 是否匹配token 不匹配说明该回调并非来自开发平台
token = obj.get("token", "")
if not token:
token = obj.get('header', {}).get('token', '')
if token != APP_VERIFICATION_TOKEN:
print("verification token not match, token =", token)
self.response("")
return
# 根据 type 处理不同类型事件
type = obj.get("type", "")
if not type:
type = obj.get('header', {}).get('event_type')
if "url_verification" == type: # 验证请求 URL 是否有效
self.handle_request_url_verify(obj)
elif type in EVENT_TYPE: # 事件回调
# 获取事件内容和类型,并进行相应处理,此处只关注给机器人推送的消息事件
event = obj.get("event")
if event.get("message"):
self.handle_message(event)
return
return
def handle_request_url_verify(self, post_obj):
# 原样返回 challenge 字段内容
challenge = post_obj.get("challenge", "")
rsp = {'challenge': challenge}
self.response(json.dumps(rsp))
return
def handle_message(self, event):
# 此处只处理 text 类型消息,其他类型消息忽略
msg = event.get('message', {})
msg_type = msg.get("message_type", "")
if msg_type == "text":
# 调用发消息 API 之前,先要获取 API 调用凭证tenant_access_token
access_token = get_tenant_access_token()
if access_token == "":
self.response("")
return
# 机器人回复收到的消息
text = json.loads(msg.get('content')).get('text')
if msg.get('chat_type') == 'group' and msg.get('mentions'):
open_id = {"open_chat_id": msg.get("chat_id")}
for mention in msg.get('mentions'):
text = text.replace(mention['key'], '')
text = text.lstrip()
else:
open_id = {"open_id": event.get("sender", {}).get(
'sender_id', {}).get('open_id')}
self.msg_compoment(access_token, open_id, text)
self.response("")
return
elif msg_type == "image":
self.response("")
return
def response(self, body):
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(body.encode())
def send_message(self, token, open_id, text):
url = "https://open.feishu.cn/open-apis/message/v4/send/"
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer " + token
}
req_body = {
"msg_type": "text",
"content": {
"text": text
}
}
req_body = dict(req_body, **open_id) # 根据open_id判断返回域
data = bytes(json.dumps(req_body), encoding='utf8')
req = request.Request(url=url, data=data, headers=headers, method='POST')
try:
response = request.urlopen(req)
except Exception as e:
print(e.read().decode())
return
rsp_body = response.read().decode('utf-8')
rsp_dict = json.loads(rsp_body)
code = rsp_dict.get("code", -1)
if code != 0:
print("send message error, code = ", code, ", msg =", rsp_dict.get("msg", ""))
def msg_compoment(self, token, open_id, text):
self.send_message(token, open_id, text)
def run():
port = 5000
server_address = ('', port)
httpd = HTTPServer(server_address, RequestHandler)
print("start.....")
httpd.serve_forever()
if __name__ == '__main__':
run()