Merge branch 'develop' of github.com:looching/dsite into develop

This commit is contained in:
Ching 2022-01-15 13:17:25 +08:00
commit 95f70c1478
10 changed files with 309 additions and 0 deletions

View File

@ -30,3 +30,4 @@ ua-parser==0.10.0
user-agents==2.2.0 user-agents==2.2.0
wcwidth==0.2.5 wcwidth==0.2.5
zipp==3.5.0 zipp==3.5.0
redis==4.1.0

267
scripts/dodo.py Normal file
View File

@ -0,0 +1,267 @@
#!/usr/bin/env python
# --coding:utf-8--
from http.server import BaseHTTPRequestHandler, HTTPServer
import json
from urllib import request
import hashlib
import base64
from Crypto.Cipher import AES
from mastodon import Mastodon
import logging
import redis
import requests
import time
APP_VERIFICATION_TOKEN = 'uKQQiOVMYg2cTgrjkyBmodrHTUaCXzG3'
APP_ID = 'cli_a115fe8b83f9100c'
APP_SECRET = 'yuSQenId0VfvwdZ3qL9wMd8FpCMEUL0u'
ENCRYPT_KEY = '4XfjcA5xou3pztBD4g5V7dgHtr0BBYDE'
EVENT_TYPE = ['im.message.receive_v1']
ADD_GROUP_NAME = True
KEDAI_ID = '107263380636355825'
logging.basicConfig(filename='/root/develop/log/dodo.log', level=logging.INFO)
logger = logging.getLogger('/root/develop/log/dodo.log')
mastodon = Mastodon(
access_token = 'Ug_bUMWCk3RLamOnqYIytmeB0nO6aNfjdmf06mAj2bE',
api_base_url = 'https://nofan.xyz'
)
pool = redis.ConnectionPool(host='localhost', port=6379, decode_responses=True)
redis_cli = redis.Redis(host='localhost', port=6379, decode_responses=True)
class AESCipher(object):
def __init__(self, key):
self.bs = AES.block_size
self.key=hashlib.sha256(AESCipher.str_to_bytes(key)).digest()
@staticmethod
def str_to_bytes(data):
u_type = type(b"".decode('utf8'))
if isinstance(data, u_type):
return data.encode('utf8')
return data
@staticmethod
def _unpad(s):
return s[:-ord(s[len(s) - 1:])]
def decrypt(self, enc):
iv = enc[:AES.block_size]
cipher = AES.new(self.key, AES.MODE_CBC, iv)
return self._unpad(cipher.decrypt(enc[AES.block_size:]))
def decrypt_string(self, enc):
enc = base64.b64decode(enc)
return self.decrypt(enc).decode('utf8')
def get_tenant_access_token(): # 获取token
token = redis_cli.get('tenant_access_token_%s' % APP_ID)
if token:
return token
url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal/"
headers = {
"Content-Type": "application/json"
}
req_body = {
"app_id": APP_ID,
"app_secret": APP_SECRET
}
data = bytes(json.dumps(req_body), encoding='utf8')
req = request.Request(url=url, data=data, headers=headers, method='POST')
try:
response = request.urlopen(req)
except Exception as e:
logger.error('get tenant token error: %s', e.read().decode())
return ""
rsp_body = response.read().decode('utf-8')
rsp_dict = json.loads(rsp_body)
code = rsp_dict.get("code", -1)
if code != 0:
logger.error("get tenant_access_token error, code =%s", code)
return ""
token = redis_cli.set('tenant_access_token_%s' % APP_ID,
rsp_dict.get("tenant_access_token", ""),
ex=60*30)
return token
def get_group_name(chat_id):
group_name = redis_cli.get('group_name_%s' % chat_id)
if not group_name:
url = "https://open.feishu.cn/open-apis/im/v1/chats/"
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer " + get_tenant_access_token()
}
try:
resp = requests.get(url+chat_id, headers=headers)
resp_data = resp.json()
code = resp_data.get("code", -1)
if code == 0:
group_name = resp_data.get('data', {}).get('name')
redis_cli.set('group_name_%s' % chat_id,
group_name,
ex=60*60*24)
except:
# todo: log
return
return group_name
class RequestHandler(BaseHTTPRequestHandler):
def do_POST(self):
# 解析请求 body
req_body = self.rfile.read(int(self.headers['content-length']))
obj = json.loads(req_body.decode("utf-8"))
cipher = AESCipher(ENCRYPT_KEY)
obj = json.loads(cipher.decrypt_string(obj['encrypt']))
logger.info('lark request body: %s', obj)
# 校验 verification token 是否匹配token 不匹配说明该回调并非来自开发平台
token = obj.get("token", "")
if not token:
token = obj.get('header', {}).get('token', '')
if token != APP_VERIFICATION_TOKEN:
logger.error("verification token not match, token =%s", token)
self.response("")
return
# 根据 type 处理不同类型事件
type = obj.get("type", "")
if not type:
type = obj.get('header', {}).get('event_type')
if "url_verification" == type: # 验证请求 URL 是否有效
self.handle_request_url_verify(obj)
elif type in EVENT_TYPE: # 事件回调
# 获取事件内容和类型,并进行相应处理,此处只关注给机器人推送的消息事件
event_id = obj.get('header', {}).get('event_id', '')
# 重复收到的事件不处理
if event_id and redis_cli.get(event_id):
self.response("")
return
event = obj.get("event")
if event.get("message"):
self.handle_message(event, event_id)
return
return
def handle_request_url_verify(self, post_obj):
# 原样返回 challenge 字段内容
challenge = post_obj.get("challenge", "")
rsp = {'challenge': challenge}
self.response(json.dumps(rsp))
return
def handle_message(self, event, event_id=None):
# 此处只处理 text 类型消息,其他类型消息忽略
msg = event.get('message', {})
msg_type = msg.get("message_type", "")
if msg_type == "text":
# 调用发消息 API 之前,先要获取 API 调用凭证tenant_access_token
access_token = get_tenant_access_token()
if access_token == "":
self.response("")
return
# 机器人回复收到的消息
text = json.loads(msg.get('content')).get('text')
orig_text = text
if msg.get('chat_type') == 'group' and msg.get('mentions'):
open_id = {"open_chat_id": msg.get("chat_id")}
for mention in msg.get('mentions'):
text = text.replace(mention['key'], '')
text = text.lstrip()
orig_text = text
if ADD_GROUP_NAME:
group_name = get_group_name(msg.get("chat_id"))
text = '%s #%s' % (text, group_name)
else:
open_id = {"open_id": event.get("sender", {}).get(
'sender_id', {}).get('open_id')}
self.response("")
if orig_text == '/last':
try:
statuses = mastodon.account_statuses(KEDAI_ID, limit=1)
self.msg_compoment(access_token, open_id,
statuses[0]['content'])
except Exception as exc:
logger.error('operation error: %s', str(exc))
elif orig_text == '/del':
try:
statuses = mastodon.account_statuses(KEDAI_ID, limit=1)
Mastodon.status_delete(statuses[0]['id'])
self.msg_compoment(access_token, open_id,
'已删除: ' + statuses[0]['content'])
except Exception as exc:
logger.error('operation error: %s', str(exc))
try:
toot_resp = mastodon.status_post(text)
if toot_resp.get('id'):
self.msg_compoment(access_token, open_id, '📟 dodo 📟')
redis_cli.set(event_id, int(time.time()), ex=60*60*7)
else:
self.msg_compoment(access_token, open_id, """⚠️ didi ⚠️
%s
""" % json.loads(toot_resp))
except Exception as exc:
logger.error('send toot error: %s', str(exc))
return
elif msg_type == "image":
self.response("")
return
def response(self, body):
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(body.encode())
def send_message(self, token, open_id, text):
url = "https://open.feishu.cn/open-apis/message/v4/send/"
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer " + token
}
req_body = {
"msg_type": "text",
"content": {
"text": text
}
}
req_body = dict(req_body, **open_id) # 根据open_id判断返回域
data = bytes(json.dumps(req_body), encoding='utf8')
req = request.Request(url=url, data=data, headers=headers, method='POST')
try:
response = request.urlopen(req)
except Exception as e:
logger.error('send message error: %s', e.read().decode())
return
rsp_body = response.read().decode('utf-8')
rsp_dict = json.loads(rsp_body)
code = rsp_dict.get("code", -1)
if code != 0:
logger.error("send message error, code = %s, msg =%s",
code,
rsp_dict.get("msg", ""))
def msg_compoment(self, token, open_id, text):
self.send_message(token, open_id, text)
def run():
port = 5000
server_address = ('', port)
httpd = HTTPServer(server_address, RequestHandler)
logger.info("start...")
httpd.serve_forever()
if __name__ == '__main__':
run()

0
toot/__init__.py Normal file
View File

3
toot/admin.py Normal file
View File

@ -0,0 +1,3 @@
from django.contrib import admin
# Register your models here.

6
toot/apps.py Normal file
View File

@ -0,0 +1,6 @@
from django.apps import AppConfig
class TootConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'toot'

View File

1
toot/models.py Normal file
View File

@ -0,0 +1 @@
from django.db import models

3
toot/tests.py Normal file
View File

@ -0,0 +1,3 @@
from django.test import TestCase
# Create your tests here.

3
toot/views.py Normal file
View File

@ -0,0 +1,3 @@
from django.shortcuts import render
# Create your views here.

25
utils/depoly_notify.py Executable file
View File

@ -0,0 +1,25 @@
# -*- coding: UTF-8 -*-
import sys
import os
from imp import reload
sys.path.insert(0, os.path.abspath('..'))
sys.path.append('/Users/ching/develop/dsite')
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "../dsite.settings")
reload(sys)
import utils.lark
import git
import ipdb
if __name__ == '__main__':
# def notify():
repo = git.Repo(search_parent_directories=True)
commit = repo.head.commit
rev, branch = commit.name_rev.split(' ')
msg = 'rev: %s\n\nauther: %s\n\nbranch: %s\n\nmessage: %s' % (
rev,
commit.author.name,
branch,
commit.summary
)
print(utils.lark.request({'text': msg}))