import redis import requests from urllib import request import json import logging from dsite import settings from utils import const # logging.basicConfig(filename='/root/develop/log/dodo.log', level=logging.INFO) # logger = logging.getLogger('/root/develop/log/dodo.log') logger = logging.getLogger('dsite.' + __name__) APP_ID = settings.LARK_APP_ID APP_SECRET = settings.LARK_APP_SECRET pool = redis.ConnectionPool(host='localhost', port=6379, decode_responses=True) redis_cli = redis.Redis(host='localhost', port=6379, decode_responses=True) class Feishu: def __init__(self): self.token = self.get_tenant_access_token() def get_tenant_access_token(self): # 获取token token = redis_cli.get('tenant_access_token_%s' % APP_ID) if token: return token url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal/" headers = {"Content-Type": "application/json"} req_body = {"app_id": APP_ID, "app_secret": APP_SECRET} data = bytes(json.dumps(req_body), encoding='utf8') req = request.Request(url=url, data=data, headers=headers, method='POST') try: response = request.urlopen(req) except Exception as e: logger.error('get tenant token error: %s', e.read().decode()) return "" rsp_body = response.read().decode('utf-8') rsp_dict = json.loads(rsp_body) code = rsp_dict.get("code", -1) if code != 0: logger.error("get tenant_access_token error, code =%s", code) return "" token = rsp_dict.get("tenant_access_token", "") redis_cli.set('tenant_access_token_%s' % APP_ID, rsp_dict.get("tenant_access_token", ""), ex=60 * 30) return token def send_message(self, open_id, text, msg_type=None, content=None): url = "https://open.feishu.cn/open-apis/message/v4/send/" headers = {"Content-Type": "application/json", "Authorization": "Bearer " + self.token} if not msg_type: msg_type = const.LARK_WEBHOOK_MSG_TYPE_TEXT req_body = {"msg_type": msg_type} if msg_type == const.LARK_WEBHOOK_MSG_TYPE_TEXT: req_body['content'] = {'text': text} elif msg_type == const.LARK_WEBHOOK_MSG_TYPE_INTERACTIVE: req_body['card'] = content req_body = dict(req_body, **open_id) # 根据open_id判断返回域 data = bytes(json.dumps(req_body), encoding='utf8') req = request.Request(url=url, data=data, headers=headers, method='POST') try: response = request.urlopen(req) except Exception as e: logger.error('send message error: %s', e.read().decode()) return rsp_body = response.read().decode('utf-8') rsp_dict = json.loads(rsp_body) code = rsp_dict.get("code", -1) if code != 0: logger.error("send message error, code = %s, msg =%s", code, rsp_dict.get("msg", "")) def msg_compoment(self, open_id, text, msg_type=None, content=None): self.send_message(open_id, text, msg_type, content) def send_web_hook_message(self, web_hook_url, content): """发送webhook消息 Args: web_hook_url (sting): web hook url content (dict): 消息内容 {"msg_type":"text","content":{"text":"request example"}} Returns: resp: response object """ return requests.post(web_hook_url, json=content)