style(*.py): use Black as python code formatter

use Black as python code formatter

Signed-off-by: Ching <loooching@gmail.com>
This commit is contained in:
Ching 2022-01-22 18:02:32 +08:00
parent beb10e00a9
commit f683341f66
20 changed files with 529 additions and 614 deletions

13
.vscode/settings.json vendored
View File

@ -1,2 +1,13 @@
{
}
"python.formatting.provider": "black",
"python.formatting.blackArgs": [
"-S",
"-C",
"-l 120"
],
"editor.formatOnPaste": true,
"editor.formatOnSave": true,
"editor.rulers": [
120
]
}

View File

@ -2,12 +2,11 @@
from rest_framework import pagination
from rest_framework.response import Response
class PagePaginationWithPageCount(pagination.PageNumberPagination):
page_size_query_param = 'page_size'
def get_paginated_response(self, data):
response = super().get_paginated_response(data)
response.data['page_count'] = self.page.paginator.num_pages
return response
page_size_query_param = 'page_size'
def get_paginated_response(self, data):
response = super().get_paginated_response(data)
response.data['page_count'] = self.page.paginator.num_pages
return response

View File

@ -37,14 +37,11 @@ INSTALLED_APPS = [
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
# third party
'corsheaders',
'rest_framework',
'rest_framework.authtoken',
'django_filters',
# user apps
'timer',
'recipe',
@ -74,9 +71,9 @@ TEMPLATES = [
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
],
]
},
},
}
]
WSGI_APPLICATION = 'dsite.wsgi.application'
@ -85,30 +82,17 @@ WSGI_APPLICATION = 'dsite.wsgi.application'
# Database
# https://docs.djangoproject.com/en/1.11/ref/settings/#databases
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': os.path.join(BASE_DIR, 'db.sqlite3'),
}
}
DATABASES = {'default': {'ENGINE': 'django.db.backends.sqlite3', 'NAME': os.path.join(BASE_DIR, 'db.sqlite3')}}
# Password validation
# https://docs.djangoproject.com/en/1.11/ref/settings/#auth-password-validators
AUTH_PASSWORD_VALIDATORS = [
{
'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
},
{'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator'},
{'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator'},
{'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator'},
{'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator'},
]
@ -140,11 +124,10 @@ REST_FRAMEWORK = {
# 'DEFAULT_PERMISSION_CLASSES': [
# 'rest_framework.permissions.IsAuthenticatedOrReadOnly'
# ],
# 'DEFAULT_PAGINATION_CLASS': 'rest_framework.pagination.PageNumberPagination',
'DEFAULT_PAGINATION_CLASS': 'core.pagination.PagePaginationWithPageCount',
'PAGE_SIZE': 10,
'DEFAULT_FILTER_BACKENDS': ['django_filters.rest_framework.DjangoFilterBackend']
'DEFAULT_FILTER_BACKENDS': ['django_filters.rest_framework.DjangoFilterBackend'],
}
# CORS
@ -153,7 +136,7 @@ REST_FRAMEWORK = {
# 'http://localhost:8080',
# )
CORS_ALLOWED_ORIGINS = []
CORS_ALLOW_CREDENTIALS = True # 允许携带cookie
CORS_ALLOW_CREDENTIALS = True # 允许携带cookie
# CORS_ALLOW_ALL_ORIGINS = False # If this is used then `CORS_ALLOWED_ORIGINS` will not have any effect

View File

@ -7,8 +7,7 @@ class Migration(migrations.Migration):
initial = True
dependencies = [
]
dependencies = []
operations = [
migrations.CreateModel(
@ -21,5 +20,5 @@ class Migration(migrations.Migration):
('rate', models.IntegerField(default=0)),
('difficulty', models.IntegerField(default=0)),
],
),
)
]

View File

@ -5,15 +5,11 @@ from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('recipe', '0001_initial'),
]
dependencies = [('recipe', '0001_initial')]
operations = [
migrations.AlterField(
model_name='recipe',
name='recipe_type',
field=models.CharField(default='meat', max_length=32),
model_name='recipe', name='recipe_type', field=models.CharField(default='meat', max_length=32)
),
migrations.CreateModel(
name='DailyRecipe',

View File

@ -5,226 +5,178 @@ import datetime
from utils import const
import utils
class Recipe(models.Model):
name = models.CharField(max_length=128)
recipe_type = models.CharField(max_length=32,
default=const.RECIPE_TYPE_MEAT)
note = models.TextField(null=True)
rate = models.IntegerField(default=0)
difficulty = models.IntegerField(default=0)
name = models.CharField(max_length=128)
recipe_type = models.CharField(max_length=32, default=const.RECIPE_TYPE_MEAT)
note = models.TextField(null=True)
rate = models.IntegerField(default=0)
difficulty = models.IntegerField(default=0)
def serialize(self, verbose=False):
data = {
'id': self.id,
'name': self.name,
'recipe_type': self.recipe_type,
}
if verbose:
data.update({
'difficulty': self.difficulty,
'rate': self.rate,
'note': self.note,
})
def serialize(self, verbose=False):
data = {'id': self.id, 'name': self.name, 'recipe_type': self.recipe_type}
if verbose:
data.update({'difficulty': self.difficulty, 'rate': self.rate, 'note': self.note})
return data
return data
@classmethod
def create_from_str(cls, content):
content = content.strip()
if not content:
return
name, *data = content.split(' ')
recipe_type = rate = difficulty = None
keys = [recipe_type, rate, difficulty]
for x in range(len(data)):
keys[x] = data[x]
recipe_type, rate, difficulty = keys
@classmethod
def create_from_str(cls, content):
content = content.strip()
if not content:
return
name, *data = content.split(' ')
recipe_type = rate = difficulty = None
keys = [recipe_type, rate, difficulty]
for x in range(len(data)):
keys[x] = data[x]
recipe_type, rate, difficulty = keys
if recipe_type == '':
recipe_type = const.RECIPE_TYPE_MEAT
elif recipe_type == '':
recipe_type = const.RECIPE_TYPE_VEGETABLE
elif recipe_type == '':
recipe_type = const.RECIPE_TYPE_SOUP
else:
recipe_type = const.RECIPE_TYPE_MEAT
if rate:
try:
rate = int(rate)
except:
rate = 0
else:
rate = 0
if difficulty:
try:
difficulty = int(difficulty)
except:
difficulty = 0
else:
difficulty = 0
recipe = cls.objects.create(
name=name,
recipe_type=recipe_type,
rate=rate,
difficulty=difficulty
)
return recipe
if recipe_type == '':
recipe_type = const.RECIPE_TYPE_MEAT
elif recipe_type == '':
recipe_type = const.RECIPE_TYPE_VEGETABLE
elif recipe_type == '':
recipe_type = const.RECIPE_TYPE_SOUP
else:
recipe_type = const.RECIPE_TYPE_MEAT
if rate:
try:
rate = int(rate)
except:
rate = 0
else:
rate = 0
if difficulty:
try:
difficulty = int(difficulty)
except:
difficulty = 0
else:
difficulty = 0
recipe = cls.objects.create(name=name, recipe_type=recipe_type, rate=rate, difficulty=difficulty)
return recipe
@property
def display_recipe_type(self):
if self.recipe_type == const.RECIPE_TYPE_VEGETABLE:
return ''
elif self.recipe_type == const.RECIPE_TYPE_MEAT:
return ''
elif self.recipe_type == const.RECIPE_TYPE_SOUP:
return ''
return ''
@property
def display_recipe_type(self):
if self.recipe_type == const.RECIPE_TYPE_VEGETABLE:
return ''
elif self.recipe_type == const.RECIPE_TYPE_MEAT:
return ''
elif self.recipe_type == const.RECIPE_TYPE_SOUP:
return ''
return ''
@property
def display_rate(self):
return '🍚' * self.rate
@property
def display_rate(self):
return '🍚' * self.rate
@property
def display_difficult(self):
return '⭐️' * self.difficulty
@property
def display_difficult(self):
return '⭐️' * self.difficulty
def construct_lart_card(self):
def construct_lart_card(self):
data = {
"config": {
"wide_screen_mode": True
},
"header": {
"title": {
"tag": "plain_text",
"content": self.name,
},
"template": "blue"
},
"elements": [
{
"tag": "markdown",
"content": "**类型**%s\n**评分**%s\n**难度**%s" % (
self.display_recipe_type, self.display_rate, self.display_difficult)
},
{
"tag": "action",
"actions": [
{
"tag": "button",
"text": {
"tag": "plain_text",
"content": "查看"
},
"multi_url": {
"url": "https://recipe.tunpok.com/recipe/%s" % self.id,
"android_url": "https://recipe.tunpok.com/recipe-mobile/recipe/%s" % self.id,
"ios_url": "https://recipe.tunpok.com/recipe-mobile/recipe/%s" % self.id,
"pc_url": "https://recipe.tunpok.com/recipe/%s" % self.id
},
"type": "primary"
},
{
"tag": "button",
"text": {
"tag": "plain_text",
"content": "删除"
},
"type": "danger"
}
]
data = {
"config": {"wide_screen_mode": True},
"header": {"title": {"tag": "plain_text", "content": self.name}, "template": "blue"},
"elements": [
{
"tag": "markdown",
"content": "**类型**%s\n**评分**%s\n**难度**%s"
% (self.display_recipe_type, self.display_rate, self.display_difficult),
},
{
"tag": "action",
"actions": [
{
"tag": "button",
"text": {"tag": "plain_text", "content": "查看"},
"multi_url": {
"url": "https://recipe.tunpok.com/recipe/%s" % self.id,
"android_url": "https://recipe.tunpok.com/recipe-mobile/recipe/%s" % self.id,
"ios_url": "https://recipe.tunpok.com/recipe-mobile/recipe/%s" % self.id,
"pc_url": "https://recipe.tunpok.com/recipe/%s" % self.id,
},
"type": "primary",
},
{"tag": "button", "text": {"tag": "plain_text", "content": "删除"}, "type": "danger"},
],
},
],
}
]
}
return data
return data
class DailyRecipe(models.Model):
recipes = models.ManyToManyField(Recipe)
date = models.DateField()
meal_type = models.CharField(
max_length=32,
default=const.DAILY_RECIPE_MEAL_TYPE_SUPPER,)
recipes = models.ManyToManyField(Recipe)
date = models.DateField()
meal_type = models.CharField(max_length=32, default=const.DAILY_RECIPE_MEAL_TYPE_SUPPER)
def generate_recipe(self, prev_recipes=None, ignore_prev=False):
if not prev_recipes:
prev_recipes = []
if ignore_prev:
prev_recipes = []
def generate_recipe(self, prev_recipes=None, ignore_prev=False):
if not prev_recipes:
prev_recipes = []
if ignore_prev:
prev_recipes = []
recipes = []
retry = 5
recipes = []
retry = 5
# meat
for x in range(0,2):
while True:
recipe = Recipe.objects.filter(
recipe_type=const.RECIPE_TYPE_MEAT,
).order_by('?').first()
if recipe and recipe.id not in recipes and recipe.id not in prev_recipes:
recipes.append(recipe.id)
break
else:
retry -= 1
if retry <= 0:
retry = 5
break
# meat
for x in range(0, 2):
while True:
recipe = Recipe.objects.filter(recipe_type=const.RECIPE_TYPE_MEAT).order_by('?').first()
if recipe and recipe.id not in recipes and recipe.id not in prev_recipes:
recipes.append(recipe.id)
break
else:
retry -= 1
if retry <= 0:
retry = 5
break
# vegetable
for x in range(0, 1):
while True:
recipe = Recipe.objects.filter(
recipe_type=const.RECIPE_TYPE_VEGETABLE,
).order_by('?').first()
if recipe and recipe.id not in recipes and recipe.id not in prev_recipes:
recipes.append(recipe.id)
break
else:
retry -= 1
if retry <= 0:
retry = 5
break
# vegetable
for x in range(0, 1):
while True:
recipe = Recipe.objects.filter(recipe_type=const.RECIPE_TYPE_VEGETABLE).order_by('?').first()
if recipe and recipe.id not in recipes and recipe.id not in prev_recipes:
recipes.append(recipe.id)
break
else:
retry -= 1
if retry <= 0:
retry = 5
break
# soup
if random.randint(0,2):
recipe = Recipe.objects.filter(
recipe_type=const.RECIPE_TYPE_SOUP,
).order_by('?').first()
# if recipe not in recipes and recipe not in prev_recipes:
if recipe:
recipes.append(recipe.id)
# soup
if random.randint(0, 2):
recipe = Recipe.objects.filter(recipe_type=const.RECIPE_TYPE_SOUP).order_by('?').first()
# if recipe not in recipes and recipe not in prev_recipes:
if recipe:
recipes.append(recipe.id)
self.recipes.set(Recipe.objects.filter(id__in=recipes))
self.recipes.set(Recipe.objects.filter(id__in=recipes))
def serialize(self):
data = {
const.RECIPE_TYPE_MEAT: [],
const.RECIPE_TYPE_VEGETABLE: [],
const.RECIPE_TYPE_SOUP: [],
}
for key_, value_ in data.items():
for recipe in self.recipes.filter(
recipe_type=key_).order_by('id'):
value_.append(recipe.serialize())
def serialize(self):
data = {const.RECIPE_TYPE_MEAT: [], const.RECIPE_TYPE_VEGETABLE: [], const.RECIPE_TYPE_SOUP: []}
for key_, value_ in data.items():
for recipe in self.recipes.filter(recipe_type=key_).order_by('id'):
value_.append(recipe.serialize())
date = now()
date = date.replace(year=self.date.year, month=self.date.month,
day=self.date.day, hour=0, minute=0)
data['date'] = utils.timestamp_of(utils.day_start(date))
data['id'] = self.id
return data
date = now()
date = date.replace(year=self.date.year, month=self.date.month, day=self.date.day, hour=0, minute=0)
data['date'] = utils.timestamp_of(utils.day_start(date))
data['id'] = self.id
return data
@classmethod
def get_week_recipe_data(cls):
today = localtime()
week_start = (today - datetime.timedelta(days=today.weekday())).date()
week_end = week_start + datetime.timedelta(days=6)
daily_recipes = cls.objects.filter(
date__gte=week_start,
date__lte=week_end,
).order_by('date')
data = [{}] * (7 - len(daily_recipes))
@classmethod
def get_week_recipe_data(cls):
today = localtime()
week_start = (today - datetime.timedelta(days=today.weekday())).date()
week_end = week_start + datetime.timedelta(days=6)
daily_recipes = cls.objects.filter(date__gte=week_start, date__lte=week_end).order_by('date')
data = [{}] * (7 - len(daily_recipes))
for daily_recipe in daily_recipes:
data.append(daily_recipe.serialize())
return data
for daily_recipe in daily_recipes:
data.append(daily_recipe.serialize())
return data

View File

@ -4,23 +4,25 @@ from rest_framework import serializers
import recipe.models
class RecipeSerializer(serializers.ModelSerializer):
id = serializers.IntegerField(read_only=True)
class Meta:
model = recipe.models.Recipe
fields = '__all__'
id = serializers.IntegerField(read_only=True)
class Meta:
model = recipe.models.Recipe
fields = '__all__'
class WeekRecipeSerializer(serializers.ModelSerializer):
class Meta:
model = recipe.models.DailyRecipe
fields = '__all__'
class Meta:
model = recipe.models.DailyRecipe
fields = '__all__'
class DailyRecipeSerializer(serializers.ModelSerializer):
id = serializers.IntegerField(read_only=True)
recipes = RecipeSerializer(many=True)
class Meta:
model = recipe.models.DailyRecipe
fields = '__all__'
id = serializers.IntegerField(read_only=True)
recipes = RecipeSerializer(many=True)
class Meta:
model = recipe.models.DailyRecipe
fields = '__all__'

View File

@ -1,4 +1,5 @@
from django.conf.urls import include, url
# from django.core.urlresolvers import reverse
from django.urls import path
from rest_framework import routers
@ -11,6 +12,5 @@ urlpatterns = [
url(r'^recipe/(?P<pk>\d+)$', views.RecipeAPI.as_view(), name='recipe-detail'),
url(r'^recipe/$', views.RecipeListAPI.as_view()),
url(r'^week-recipe/$', views.WeekRecipeListAPI.as_view()),
url(r'^daily-recipe/(?P<pk>\d+)$', views.DailyRecipeAPI.as_view(),
name='dailyrecipe-detail'),
url(r'^daily-recipe/(?P<pk>\d+)$', views.DailyRecipeAPI.as_view(), name='dailyrecipe-detail'),
]

View File

@ -11,73 +11,66 @@ import recipe.models
import recipe.serializers
from utils import const
class RecipeAPI(rest_framework.generics.RetrieveUpdateAPIView):
# authentication_classes = (authentication.TokenAuthentication,
# authentication.SessionAuthentication,
# authentication.BasicAuthentication)
# permission_classes = (permissions.IsAuthenticated,)
queryset = recipe.models.Recipe.objects.all()
serializer_class = recipe.serializers.RecipeSerializer
# authentication_classes = (authentication.TokenAuthentication,
# authentication.SessionAuthentication,
# authentication.BasicAuthentication)
# permission_classes = (permissions.IsAuthenticated,)
queryset = recipe.models.Recipe.objects.all()
serializer_class = recipe.serializers.RecipeSerializer
class RecipeListAPI(rest_framework.generics.ListAPIView,
rest_framework.generics.CreateAPIView):
class RecipeListAPI(rest_framework.generics.ListAPIView, rest_framework.generics.CreateAPIView):
# authentication_classes = (authentication.TokenAuthentication,
# authentication.SessionAuthentication,
# authentication.BasicAuthentication)
# permission_classes = (permissions.IsAuthenticated,)
queryset = recipe.models.Recipe.objects.all()
serializer_class = recipe.serializers.RecipeSerializer
filterset_fields = {
'recipe_type': const.FILTER_EXACT,
'difficulty': const.FILTER_GTE_LTE,
'rate': const.FILTER_GTE_LTE,
}
# authentication_classes = (authentication.TokenAuthentication,
# authentication.SessionAuthentication,
# authentication.BasicAuthentication)
# permission_classes = (permissions.IsAuthenticated,)
queryset = recipe.models.Recipe.objects.all()
serializer_class = recipe.serializers.RecipeSerializer
filterset_fields = {
'recipe_type': const.FILTER_EXACT,
'difficulty': const.FILTER_GTE_LTE,
'rate': const.FILTER_GTE_LTE,
}
class WeekRecipeListAPI(rest_framework.generics.ListAPIView,
rest_framework.generics.CreateAPIView):
class WeekRecipeListAPI(rest_framework.generics.ListAPIView, rest_framework.generics.CreateAPIView):
queryset = recipe.models.DailyRecipe.objects.all()
serializer_class = recipe.serializers.WeekRecipeSerializer
queryset = recipe.models.DailyRecipe.objects.all()
serializer_class = recipe.serializers.WeekRecipeSerializer
def post(self, request, *args, **kwargs):
# Monday == 0 ... Sunday == 6
today = localtime()
recipes = []
for x in range(0, 7-today.weekday()):
daily_recipe, __ = recipe.models.DailyRecipe.objects.get_or_create(
date=today + datetime.timedelta(days=x)
)
daily_recipe.generate_recipe(recipes)
recipes.extend(daily_recipe.recipes.values_list('id', flat=True))
return Response(recipe.models.DailyRecipe.get_week_recipe_data(),
status=status.HTTP_201_CREATED, headers={})
def post(self, request, *args, **kwargs):
# Monday == 0 ... Sunday == 6
today = localtime()
recipes = []
for x in range(0, 7 - today.weekday()):
daily_recipe, __ = recipe.models.DailyRecipe.objects.get_or_create(date=today + datetime.timedelta(days=x))
daily_recipe.generate_recipe(recipes)
recipes.extend(daily_recipe.recipes.values_list('id', flat=True))
return Response(recipe.models.DailyRecipe.get_week_recipe_data(), status=status.HTTP_201_CREATED, headers={})
def get(self, request, *args, **kwargs):
data = recipe.models.DailyRecipe.get_week_recipe_data()
return Response(data)
def get(self, request, *args, **kwargs):
data = recipe.models.DailyRecipe.get_week_recipe_data()
return Response(data)
class DailyRecipeAPI(rest_framework.generics.RetrieveUpdateAPIView):
queryset = recipe.models.DailyRecipe.objects.all()
serializer_class = recipe.serializers.DailyRecipeSerializer
queryset = recipe.models.DailyRecipe.objects.all()
serializer_class = recipe.serializers.DailyRecipeSerializer
def post(self, request, *args, **kwargs):
daily_recipe = recipe.models.DailyRecipe.objects.get(id=kwargs['pk'])
daily_recipe.generate_recipe()
return Response(daily_recipe.serialize(), status=status.HTTP_201_CREATED,
headers={})
def post(self, request, *args, **kwargs):
daily_recipe = recipe.models.DailyRecipe.objects.get(id=kwargs['pk'])
daily_recipe.generate_recipe()
return Response(daily_recipe.serialize(), status=status.HTTP_201_CREATED, headers={})
def put(self, request, *args, **kwargs):
daily_recipe = recipe.models.DailyRecipe.objects.get(id=kwargs['pk'])
recipes = request.data.get('meat',[])
recipes.extend(request.data.get('vegetable', []))
recipes.extend(request.data.get('soup', []))
daily_recipe.recipes.set(recipe.models.Recipe.objects.filter(
id__in=recipes))
return Response(daily_recipe.serialize(), status=status.HTTP_201_CREATED,
headers={})
def put(self, request, *args, **kwargs):
daily_recipe = recipe.models.DailyRecipe.objects.get(id=kwargs['pk'])
recipes = request.data.get('meat', [])
recipes.extend(request.data.get('vegetable', []))
recipes.extend(request.data.get('soup', []))
daily_recipe.recipes.set(recipe.models.Recipe.objects.filter(id__in=recipes))
return Response(daily_recipe.serialize(), status=status.HTTP_201_CREATED, headers={})

View File

@ -2,10 +2,12 @@
# --coding:utf-8--
import os
import sys
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "dsite.settings")
sys.path.insert(0, '../')
sys.path.insert(0, './')
from django.core.wsgi import get_wsgi_application
get_wsgi_application()
from http.server import BaseHTTPRequestHandler, HTTPServer
@ -36,48 +38,46 @@ KEDAI_ID = '107263380636355825'
logging.basicConfig(filename='/root/develop/log/dodo.log', level=logging.INFO)
logger = logging.getLogger('/root/develop/log/dodo.log')
mastodon_cli = Mastodon(
access_token = 'Ug_bUMWCk3RLamOnqYIytmeB0nO6aNfjdmf06mAj2bE',
api_base_url = 'https://nofan.xyz'
)
mastodon_cli = Mastodon(access_token='Ug_bUMWCk3RLamOnqYIytmeB0nO6aNfjdmf06mAj2bE', api_base_url='https://nofan.xyz')
pool = redis.ConnectionPool(host='localhost', port=6379, decode_responses=True)
redis_cli = redis.Redis(host='localhost', port=6379, decode_responses=True)
class AESCipher(object):
class AESCipher(object):
def __init__(self, key):
self.bs = AES.block_size
self.key=hashlib.sha256(AESCipher.str_to_bytes(key)).digest()
self.key = hashlib.sha256(AESCipher.str_to_bytes(key)).digest()
@staticmethod
def str_to_bytes(data):
u_type = type(b"".decode('utf8'))
if isinstance(data, u_type):
return data.encode('utf8')
return data
@staticmethod
def _unpad(s):
return s[:-ord(s[len(s) - 1:])]
return s[: -ord(s[len(s) - 1 :])]
def decrypt(self, enc):
iv = enc[:AES.block_size]
iv = enc[: AES.block_size]
cipher = AES.new(self.key, AES.MODE_CBC, iv)
return self._unpad(cipher.decrypt(enc[AES.block_size:]))
return self._unpad(cipher.decrypt(enc[AES.block_size :]))
def decrypt_string(self, enc):
enc = base64.b64decode(enc)
return self.decrypt(enc).decode('utf8')
return self.decrypt(enc).decode('utf8')
def get_tenant_access_token(): # 获取token
token = redis_cli.get('tenant_access_token_%s' % APP_ID)
if token:
return token
return token
url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal/"
headers = {
"Content-Type": "application/json"
}
req_body = {
"app_id": APP_ID,
"app_secret": APP_SECRET
}
headers = {"Content-Type": "application/json"}
req_body = {"app_id": APP_ID, "app_secret": APP_SECRET}
data = bytes(json.dumps(req_body), encoding='utf8')
req = request.Request(url=url, data=data, headers=headers, method='POST')
@ -94,33 +94,27 @@ def get_tenant_access_token(): # 获取token
logger.error("get tenant_access_token error, code =%s", code)
return ""
token = rsp_dict.get("tenant_access_token", "")
redis_cli.set('tenant_access_token_%s' % APP_ID,
rsp_dict.get("tenant_access_token", ""),
ex=60*30)
redis_cli.set('tenant_access_token_%s' % APP_ID, rsp_dict.get("tenant_access_token", ""), ex=60 * 30)
return token
def get_group_name(chat_id):
group_name = redis_cli.get('group_name_%s' % chat_id)
if not group_name:
url = "https://open.feishu.cn/open-apis/im/v1/chats/"
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer " + get_tenant_access_token()
}
try:
resp = requests.get(url+chat_id, headers=headers)
resp_data = resp.json()
code = resp_data.get("code", -1)
if code == 0:
group_name = resp_data.get('data', {}).get('name')
redis_cli.set('group_name_%s' % chat_id,
group_name,
ex=60*60*24)
except:
# todo: log
return
return group_name
group_name = redis_cli.get('group_name_%s' % chat_id)
if not group_name:
url = "https://open.feishu.cn/open-apis/im/v1/chats/"
headers = {"Content-Type": "application/json", "Authorization": "Bearer " + get_tenant_access_token()}
try:
resp = requests.get(url + chat_id, headers=headers)
resp_data = resp.json()
code = resp_data.get("code", -1)
if code == 0:
group_name = resp_data.get('data', {}).get('name')
redis_cli.set('group_name_%s' % chat_id, group_name, ex=60 * 60 * 24)
except:
# todo: log
return
return group_name
class RequestHandler(BaseHTTPRequestHandler):
@ -156,12 +150,12 @@ class RequestHandler(BaseHTTPRequestHandler):
event_id = obj.get('header', {}).get('event_id', '')
# 重复收到的事件不处理
if event_id and redis_cli.get(event_id):
self.response("")
return
self.response("")
return
event = obj.get("event")
if event.get("message"):
self.handle_message(event, event_id)
return
self.handle_message(event, event_id)
return
return
def handle_request_url_verify(self, post_obj):
@ -192,71 +186,77 @@ class RequestHandler(BaseHTTPRequestHandler):
text = text.lstrip()
orig_text = text
if ADD_GROUP_NAME:
group_name = get_group_name(msg.get("chat_id"))
text = '%s #%s' % (text, group_name)
group_name = get_group_name(msg.get("chat_id"))
text = '%s #%s' % (text, group_name)
else:
open_id = {"open_id": event.get("sender", {}).get(
'sender_id', {}).get('open_id')}
open_id = {"open_id": event.get("sender", {}).get('sender_id', {}).get('open_id')}
self.response("")
if orig_text.startswith('/'):
redis_cli.set(event_id, int(time.time()), ex=60*60*7)
if orig_text not in ['/last', '/del']:
flag = False
for action_ in ['/deploy ', '/菜谱 ']:
if orig_text.startswith(action_):
flag = True
break
if not flag:
self.msg_compoment(access_token, open_id, '指令错误')
return
if orig_text == '/last':
try:
statuses = mastodon_cli.account_statuses(KEDAI_ID, limit=1)
s_text = BeautifulSoup(statuses[0]['content'], 'html.parser')
self.msg_compoment(access_token, open_id,
s_text.get_text(''))
except Exception as exc:
logger.error('operation error: %s', str(exc))
elif orig_text == '/del':
try:
statuses = mastodon_cli.account_statuses(KEDAI_ID, limit=1)
mastodon_cli.status_delete(statuses[0]['id'])
s_text = BeautifulSoup(statuses[0]['content'], 'html.parser')
self.msg_compoment(access_token, open_id,
'已删除: ' + s_text.get_text(''))
except Exception as exc:
logger.error('operation error: %s', str(exc))
elif orig_text.startswith('/deploy '):
site_ = orig_text.split('/deploy ')[1]
if site_ == 'dsite':
self.msg_compoment(access_token, open_id, '🚧 %s 开始部署 🚧' % site_)
subprocess.call("/root/deploy/dsite_prepare.sh")
subprocess.run(["supervisorctl", "restart", "dsite"])
self.msg_compoment(access_token, open_id, '🎉 %s 部署成功 🎉' % site_)
else:
self.msg_compoment(access_token, open_id, '⚠️ %s 不存在 ⚠️' % site_)
elif orig_text.startswith('/菜谱 '):
content = orig_text.split('/菜谱 ')[1]
recipe_ = recipe.models.Recipe.create_from_str(content)
if recipe_:
self.msg_compoment(access_token, open_id, None,
const.LARK_WEBHOOK_MSG_TYPE_INTERACTIVE,
recipe_.construct_lart_card())
else:
self.msg_compoment(access_token, open_id, '⚠️ 创建失败 ⚠️')
return
redis_cli.set(event_id, int(time.time()), ex=60 * 60 * 7)
if orig_text not in ['/last', '/del']:
flag = False
for action_ in ['/deploy ', '/菜谱 ']:
if orig_text.startswith(action_):
flag = True
break
if not flag:
self.msg_compoment(access_token, open_id, '指令错误')
return
if orig_text == '/last':
try:
statuses = mastodon_cli.account_statuses(KEDAI_ID, limit=1)
s_text = BeautifulSoup(statuses[0]['content'], 'html.parser')
self.msg_compoment(access_token, open_id, s_text.get_text(''))
except Exception as exc:
logger.error('operation error: %s', str(exc))
elif orig_text == '/del':
try:
statuses = mastodon_cli.account_statuses(KEDAI_ID, limit=1)
mastodon_cli.status_delete(statuses[0]['id'])
s_text = BeautifulSoup(statuses[0]['content'], 'html.parser')
self.msg_compoment(access_token, open_id, '已删除: ' + s_text.get_text(''))
except Exception as exc:
logger.error('operation error: %s', str(exc))
elif orig_text.startswith('/deploy '):
site_ = orig_text.split('/deploy ')[1]
if site_ == 'dsite':
self.msg_compoment(access_token, open_id, '🚧 %s 开始部署 🚧' % site_)
subprocess.call("/root/deploy/dsite_prepare.sh")
subprocess.run(["supervisorctl", "restart", "dsite"])
self.msg_compoment(access_token, open_id, '🎉 %s 部署成功 🎉' % site_)
else:
self.msg_compoment(access_token, open_id, '⚠️ %s 不存在 ⚠️' % site_)
elif orig_text.startswith('/菜谱 '):
content = orig_text.split('/菜谱 ')[1]
recipe_ = recipe.models.Recipe.create_from_str(content)
if recipe_:
self.msg_compoment(
access_token,
open_id,
None,
const.LARK_WEBHOOK_MSG_TYPE_INTERACTIVE,
recipe_.construct_lart_card(),
)
else:
self.msg_compoment(access_token, open_id, '⚠️ 创建失败 ⚠️')
return
try:
toot_resp = mastodon_cli.status_post(text)
if toot_resp.get('id'):
self.msg_compoment(access_token, open_id, '📟 dodo 📟')
redis_cli.set(event_id, int(time.time()), ex=60*60*7)
else:
self.msg_compoment(access_token, open_id, """⚠️ didi ⚠️
toot_resp = mastodon_cli.status_post(text)
if toot_resp.get('id'):
self.msg_compoment(access_token, open_id, '📟 dodo 📟')
redis_cli.set(event_id, int(time.time()), ex=60 * 60 * 7)
else:
self.msg_compoment(
access_token,
open_id,
"""⚠️ didi ⚠️
%s
""" % json.loads(toot_resp))
"""
% json.loads(toot_resp),
)
except Exception as exc:
logger.error('send toot error: %s', str(exc))
logger.error('send toot error: %s', str(exc))
return
elif msg_type == "image":
@ -271,15 +271,10 @@ class RequestHandler(BaseHTTPRequestHandler):
def send_message(self, token, open_id, text, msg_type=None, content=None):
url = "https://open.feishu.cn/open-apis/message/v4/send/"
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer " + token
}
headers = {"Content-Type": "application/json", "Authorization": "Bearer " + token}
if not msg_type:
msg_type = const.LARK_WEBHOOK_MSG_TYPE_TEXT
req_body = {
"msg_type": msg_type
}
req_body = {"msg_type": msg_type}
if msg_type == const.LARK_WEBHOOK_MSG_TYPE_TEXT:
req_body['content'] = {'text': text}
elif msg_type == const.LARK_WEBHOOK_MSG_TYPE_INTERACTIVE:
@ -299,13 +294,12 @@ class RequestHandler(BaseHTTPRequestHandler):
rsp_dict = json.loads(rsp_body)
code = rsp_dict.get("code", -1)
if code != 0:
logger.error("send message error, code = %s, msg =%s",
code,
rsp_dict.get("msg", ""))
logger.error("send message error, code = %s, msg =%s", code, rsp_dict.get("msg", ""))
def msg_compoment(self, token, open_id, text, msg_type=None, content=None):
self.send_message(token, open_id, text, msg_type, content)
def run():
port = 5000
server_address = ('', port)

View File

@ -4,27 +4,23 @@ import timer.models
class OfficeHoursForm(forms.ModelForm):
class Meta:
model = timer.models.OfficeHours
fields = ['user', 'begins_at', 'ends_at']
class Meta:
model = timer.models.OfficeHours
fields = ['user', 'begins_at', 'ends_at']
def __init__(self, *args, **kwargs):
self.user = kwargs.pop('user')
return super().__init__(*args, **kwargs)
def full_clean(self):
if not self.user.is_authenticated:
raise forms.ValidationError('Invalid User.')
return
def save(self):
begins_at = timer.models.OfficeHours.parse_time_str(self.data.get('begins_at'))
user = self.user
obj = timer.models.OfficeHours.objects.create(
user=user,
begins_at=begins_at,
ends_at=timer.models.OfficeHours.get_ends_at(begins_at))
return obj
def __init__(self, *args, **kwargs):
self.user = kwargs.pop('user')
return super().__init__(*args, **kwargs)
def full_clean(self):
if not self.user.is_authenticated:
raise forms.ValidationError('Invalid User.')
return
def save(self):
begins_at = timer.models.OfficeHours.parse_time_str(self.data.get('begins_at'))
user = self.user
obj = timer.models.OfficeHours.objects.create(
user=user, begins_at=begins_at, ends_at=timer.models.OfficeHours.get_ends_at(begins_at)
)
return obj

View File

@ -9,9 +9,7 @@ class Migration(migrations.Migration):
initial = True
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
dependencies = [migrations.swappable_dependency(settings.AUTH_USER_MODEL)]
operations = [
migrations.CreateModel(
@ -22,5 +20,5 @@ class Migration(migrations.Migration):
('ends_at', models.DateTimeField()),
('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL)),
],
),
)
]

View File

@ -6,22 +6,22 @@ import datetime
class OfficeHours(models.Model):
begins_at = models.DateTimeField()
ends_at = models.DateTimeField()
user = models.ForeignKey(User, on_delete=django.db.models.deletion.CASCADE)
begins_at = models.DateTimeField()
ends_at = models.DateTimeField()
user = models.ForeignKey(User, on_delete=django.db.models.deletion.CASCADE)
@classmethod
def parse_time_str(cls, time_srt):
return datetime.datetime.strptime(time_srt, '%Y-%m-%d %H:%M')
@classmethod
def parse_time_str(cls, time_srt):
return datetime.datetime.strptime(time_srt, '%Y-%m-%d %H:%M')
@classmethod
def get_ends_at(cls, begins_at):
return begins_at + datetime.timedelta(hours=9.5)
@classmethod
def get_ends_at(cls, begins_at):
return begins_at + datetime.timedelta(hours=9.5)
@property
def get_begins_at_str(self):
return localtime(self.begins_at).strftime('%Y-%m-%d %H:%M')
@property
def get_begins_at_str(self):
return localtime(self.begins_at).strftime('%Y-%m-%d %H:%M')
@property
def get_ends_at_str(self):
return localtime(self.ends_at).strftime('%H:%M')
@property
def get_ends_at_str(self):
return localtime(self.ends_at).strftime('%H:%M')

View File

@ -5,17 +5,17 @@ import timer.models
class OfficeHoursSerializer(serializers.HyperlinkedModelSerializer):
class Meta:
model = timer.models.OfficeHours
class Meta:
model = timer.models.OfficeHours
class UserSerializer(serializers.HyperlinkedModelSerializer):
class Meta:
model = User
fields = ('url', 'username', 'email', 'groups')
class Meta:
model = User
fields = ('url', 'username', 'email', 'groups')
class GroupSerializer(serializers.HyperlinkedModelSerializer):
class Meta:
model = Group
fields = ('url', 'name')
class Meta:
model = Group
fields = ('url', 'name')

View File

@ -1,4 +1,5 @@
from django.conf.urls import include, url
# from django.core.urlresolvers import reverse
from django.urls import path
from rest_framework import routers

View File

@ -19,59 +19,57 @@ from django.urls import reverse
class OfficeHoursAPI(CreateAPIView):
authentication_classes = (authentication.TokenAuthentication,
authentication.SessionAuthentication,
authentication.BasicAuthentication)
permission_classes = (permissions.IsAuthenticated,)
authentication_classes = (
authentication.TokenAuthentication,
authentication.SessionAuthentication,
authentication.BasicAuthentication,
)
permission_classes = (permissions.IsAuthenticated,)
queryset = timer.models.OfficeHours.objects.all()
queryset = timer.models.OfficeHours.objects.all()
def create(self, request, *args, **kwargs):
begins_at = request.data.get('begins_at')
if not begins_at:
raise
try:
begins_at = timer.models.OfficeHours.parse_time_str(begins_at)
ends_at = timer.models.OfficeHours.get_ends_at(begins_at)
oh, __ = timer.models.OfficeHours.objects.get_or_create(
begins_at=begins_at,
ends_at=ends_at,
user=request.user)
except ValueError:
raise
oh.refresh_from_db()
resp_data = {
'begins_at': oh.get_begins_at_str,
'ends_at': oh.get_ends_at_str,
}
def create(self, request, *args, **kwargs):
begins_at = request.data.get('begins_at')
if not begins_at:
raise
try:
begins_at = timer.models.OfficeHours.parse_time_str(begins_at)
ends_at = timer.models.OfficeHours.get_ends_at(begins_at)
oh, __ = timer.models.OfficeHours.objects.get_or_create(
begins_at=begins_at, ends_at=ends_at, user=request.user
)
except ValueError:
raise
oh.refresh_from_db()
resp_data = {'begins_at': oh.get_begins_at_str, 'ends_at': oh.get_ends_at_str}
return Response(resp_data, status=status.HTTP_201_CREATED)
return Response(resp_data, status=status.HTTP_201_CREATED)
class OfficeHoursFormView(django.views.generic.FormView):
template_name = 'index.html'
form_class = timer.forms.OfficeHoursForm
template_name = 'index.html'
form_class = timer.forms.OfficeHoursForm
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs['user'] = self.request.user
return kwargs
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs['user'] = self.request.user
return kwargs
def post(self, request, *args, **kwargs):
form = self.form_class(**self.get_form_kwargs())
try:
form.is_valid()
form.save()
except django.forms.ValidationError as e:
return JsonResponse({'error': e.message}, status=status.HTTP_400_BAD_REQUEST)
def post(self, request, *args, **kwargs):
form = self.form_class(**self.get_form_kwargs())
try:
form.is_valid()
form.save()
except django.forms.ValidationError as e:
return JsonResponse({'error': e.message}, status=status.HTTP_400_BAD_REQUEST)
return super().post(request, *args, **kwargs)
return super().post(request, *args, **kwargs)
def get_success_url(self):
return reverse('office-hours-page')
def get_success_url(self):
return reverse('office-hours-page')
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
today_oh = timer.models.OfficeHours.objects.filter(begins_at__date=localtime().date()).last()
context['today_oh'] = today_oh
return context
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
today_oh = timer.models.OfficeHours.objects.filter(begins_at__date=localtime().date()).last()
context['today_oh'] = today_oh
return context

View File

@ -6,113 +6,113 @@ from django.utils import timezone
def timestamp_of(d):
if hasattr(d, 'isoformat'):
return calendar.timegm(d.utctimetuple())
return None
if hasattr(d, 'isoformat'):
return calendar.timegm(d.utctimetuple())
return None
def now():
return timezone.localtime(timezone.now())
return timezone.localtime(timezone.now())
def midnight():
return now().replace(hour=0, minute=0, second=0, microsecond=0)
return now().replace(hour=0, minute=0, second=0, microsecond=0)
def is_today(tme):
if day_of(tme) != day_of(midnight()):
return False
return True
if day_of(tme) != day_of(midnight()):
return False
return True
def current_time_n(n):
return now() - timedelta(days=n)
return now() - timedelta(days=n)
def day_n(n):
"""Midnight of N days ago."""
return midnight() - timedelta(days=n)
"""Midnight of N days ago."""
return midnight() - timedelta(days=n)
def day_of(d):
"""Returns date part."""
return str(d.date())
"""Returns date part."""
return str(d.date())
def midnight_of(d):
"""
Args:
d : datetime object
Return:
Midnight of datetime.
"""
d = timezone.localtime(d)
return d.replace(hour=0, minute=0, second=0, microsecond=0)
"""
Args:
d : datetime object
Return:
Midnight of datetime.
"""
d = timezone.localtime(d)
return d.replace(hour=0, minute=0, second=0, microsecond=0)
def day_start(date):
""" 返回 date 这天的开始时间
"""返回 date 这天的开始时间
Args:
date: `Datetime` 对象
Args:
date: `Datetime` 对象
Returns:
返回 date 这天的开始时间0 `Datetime` 对象
"""
return timezone.localtime(date).replace(hour=0, minute=0, second=0,
microsecond=0)
Returns:
返回 date 这天的开始时间0 `Datetime` 对象
"""
return timezone.localtime(date).replace(hour=0, minute=0, second=0, microsecond=0)
def week_start(date):
""" 返回 date 这天所在周的开始时间
"""返回 date 这天所在周的开始时间
Args:
date: `Datetime` 对象
Args:
date: `Datetime` 对象
Returns:
返回 date 这天所在周开始时间周一`Datetime` 对象
"""
return timezone.localtime(date) + dateutil.relativedelta.relativedelta(
weekday=dateutil.relativedelta.MO(-1), hour=0, minute=0, second=0,
microsecond=0)
Returns:
返回 date 这天所在周开始时间周一`Datetime` 对象
"""
return timezone.localtime(date) + dateutil.relativedelta.relativedelta(
weekday=dateutil.relativedelta.MO(-1), hour=0, minute=0, second=0, microsecond=0
)
def month_start(d):
""" 返回 date 这天所在月的开始时间
"""返回 date 这天所在月的开始时间
Args:
date: `Datetime` 对象
Args:
date: `Datetime` 对象
Returns:
返回 date 这天所在月的开始时间1`Datetime` 对象
"""
return timezone.localtime(d) + dateutil.relativedelta.relativedelta(
day=1, hour=0, minute=0, second=0, microsecond=0)
Returns:
返回 date 这天所在月的开始时间1`Datetime` 对象
"""
return timezone.localtime(d) + dateutil.relativedelta.relativedelta(
day=1, hour=0, minute=0, second=0, microsecond=0
)
def month_end(d):
"""返回 date 这天所在月的结束时间,即最后一天的 23:59:59
"""返回 date 这天所在月的结束时间,即最后一天的 23:59:59
Args:
data: `Datetime` 对象
Args:
data: `Datetime` 对象
Returns:
返回 date 这天所在月的最后一天的 23:59:59 的时间
"""
start = month_start(d)
month_days = calendar.monthrange(start.year, start.month)[1]
return start + timedelta(days=month_days, seconds=-1)
Returns:
返回 date 这天所在月的最后一天的 23:59:59 的时间
"""
start = month_start(d)
month_days = calendar.monthrange(start.year, start.month)[1]
return start + timedelta(days=month_days, seconds=-1)
def year_start(d):
""" 返回 date 这天所在年的开始时间
"""返回 date 这天所在年的开始时间
Args:
date: `Datetime` 对象
Returns:
返回 date 这天所在年的开始时间1 1`Datetime` 对象
"""
return timezone.localtime(d) + dateutil.relativedelta.relativedelta(
month=1, day=1, hour=0, minute=0, second=0, microsecond=0)
Args:
date: `Datetime` 对象
Returns:
返回 date 这天所在年的开始时间1 1`Datetime` 对象
"""
return timezone.localtime(d) + dateutil.relativedelta.relativedelta(
month=1, day=1, hour=0, minute=0, second=0, microsecond=0
)

View File

@ -2,20 +2,16 @@ DAILY_RECIPE_MEAL_TYPE_BREAKFAST = 'breakfast'
DAILY_RECIPE_MEAL_TYPE_LUNCH = 'lunch'
DAILY_RECIPE_MEAL_TYPE_SUPPER = 'supper'
DAILY_RECIPE_MEAL_TYPE_CHOICE = [
DAILY_RECIPE_MEAL_TYPE_BREAKFAST,
DAILY_RECIPE_MEAL_TYPE_LUNCH,
DAILY_RECIPE_MEAL_TYPE_SUPPER,
DAILY_RECIPE_MEAL_TYPE_BREAKFAST,
DAILY_RECIPE_MEAL_TYPE_LUNCH,
DAILY_RECIPE_MEAL_TYPE_SUPPER,
]
RECIPE_TYPE_MEAT = 'meat'
RECIPE_TYPE_VEGETABLE = 'vegetable'
RECIPE_TYPE_SOUP = 'soup'
RECIPE_TYPE_CHOICE = [
RECIPE_TYPE_MEAT,
RECIPE_TYPE_VEGETABLE,
RECIPE_TYPE_SOUP,
]
RECIPE_TYPE_CHOICE = [RECIPE_TYPE_MEAT, RECIPE_TYPE_VEGETABLE, RECIPE_TYPE_SOUP]
FILTER_EXACT = ['exact']
FILTER_GTE_LTE = ['exact', 'gte', 'gt', 'lte', 'lt']

View File

@ -2,6 +2,7 @@
import sys
import os
from imp import reload
sys.path.insert(0, os.path.abspath('..'))
sys.path.append('/Users/ching/develop/dsite')
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "../dsite.settings")
@ -12,14 +13,9 @@ import git
import ipdb
if __name__ == '__main__':
# def notify():
# def notify():
repo = git.Repo(search_parent_directories=True)
commit = repo.head.commit
rev, branch = commit.name_rev.split(' ')
msg = 'rev: %s\n\nauther: %s\n\nbranch: %s\n\nmessage: %s' % (
rev,
commit.author.name,
branch,
commit.summary
)
msg = 'rev: %s\n\nauther: %s\n\nbranch: %s\n\nmessage: %s' % (rev, commit.author.name, branch, commit.summary)
print(utils.lark.request({'text': msg}))

View File

@ -13,25 +13,26 @@ from django.utils.timezone import now
def gen_sign(timestamp, secret):
# 拼接timestamp和secret
string_to_sign = '{}\n{}'.format(timestamp, secret)
hmac_code = hmac.new(string_to_sign.encode("utf-8"), digestmod=hashlib.sha256).digest()
# 拼接timestamp和secret
string_to_sign = '{}\n{}'.format(timestamp, secret)
hmac_code = hmac.new(string_to_sign.encode("utf-8"), digestmod=hashlib.sha256).digest()
# 对结果进行base64处理
sign = base64.b64encode(hmac_code).decode('utf-8')
# 对结果进行base64处理
sign = base64.b64encode(hmac_code).decode('utf-8')
return sign
return sign
def request(content, msg_type=const.LARK_WEBHOOK_MSG_TYPE_TEXT):
""" content: {'text': 'xxxxx}
"""
timestamp = utils.timestamp_of(now())
data = {
"timestamp": timestamp,
"sign": gen_sign(timestamp, settings.LARK_WEBHOOK_SECRET),
"msg_type": msg_type,
"content": content}
resp = requests.post(settings.LARK_WEBHOOK_URL, data=json.dumps(data))
if resp.status_code == 200 and resp.json().get('StatusCode') == 0:
return True
return False
"""content: {'text': 'xxxxx}"""
timestamp = utils.timestamp_of(now())
data = {
"timestamp": timestamp,
"sign": gen_sign(timestamp, settings.LARK_WEBHOOK_SECRET),
"msg_type": msg_type,
"content": content,
}
resp = requests.post(settings.LARK_WEBHOOK_URL, data=json.dumps(data))
if resp.status_code == 200 and resp.json().get('StatusCode') == 0:
return True
return False