apprise-notify-center/backend/notify_service.py
2026-02-09 03:42:51 +00:00

241 lines
8.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from typing import List, Optional, Dict, Any
from datetime import datetime
from sqlalchemy.ext.asyncio import AsyncSession
import apprise
from backend import crud
from backend.models import Channel
class NotifyService:
def __init__(self):
self.apobj = apprise.Apprise()
def _build_apprise_url(self, channel: Channel) -> Optional[str]:
"""根据通道类型构建 apprise URL"""
config = channel.config or {}
if channel.type == "discord":
webhook_url = config.get("webhook_url", "")
if webhook_url:
return f"{webhook_url}"
elif channel.type == "telegram":
bot_token = config.get("bot_token", "")
chat_id = config.get("chat_id", "")
if bot_token and chat_id:
return f"tgram://{bot_token}/{chat_id}"
elif channel.type == "email":
smtp_host = config.get("smtp_host", "")
smtp_port = config.get("smtp_port", 587)
username = config.get("username", "")
password = config.get("password", "")
to_email = config.get("to_email", "")
if username and password and to_email:
return f"mailtos://{username}:{password}@{smtp_host}:{smtp_port}?to={to_email}"
elif channel.type == "slack":
webhook_url = config.get("webhook_url", "")
if webhook_url:
return f"{webhook_url}"
elif channel.type == "webhook":
url = config.get("url", "")
if url:
return f"json://{url.replace('https://', '').replace('http://', '')}"
# 支持 apprise 原生 URL 格式
elif channel.type == "apprise":
return config.get("url", "")
# Bark iOS 推送 - 在 send_notification 中直接处理
elif channel.type == "bark":
base_url = config.get("base_url", "")
device_key = config.get("device_key", "")
if base_url and device_key:
# 返回特殊标记,实际发送在 send_notification 中处理
return "__bark__"
return None
async def send_notification(
self,
db: AsyncSession,
channel_id: int,
title: Optional[str],
body: str,
priority: str = "normal"
) -> Dict[str, Any]:
"""发送单条通知到指定通道"""
channel = await crud.get_channel(db, channel_id)
if not channel:
return {
"channel_id": channel_id,
"status": "failed",
"error_msg": "Channel not found"
}
if not channel.is_active:
return {
"channel": channel.name,
"channel_id": channel_id,
"status": "skipped",
"error_msg": "Channel is inactive"
}
# 创建通知记录
notification = await crud.create_notification(
db, channel_id, {
"title": title,
"body": body,
"priority": priority,
"status": "pending"
}
)
# 构建 apprise URL
apprise_url = self._build_apprise_url(channel)
if not apprise_url:
error_msg = f"Invalid configuration for channel type: {channel.type}"
await crud.update_notification_status(
db, notification.id, "failed", error_msg
)
return {
"channel": channel.name,
"channel_id": channel_id,
"status": "failed",
"notification_id": notification.id,
"error_msg": error_msg
}
try:
# Bark 特殊处理 - 使用 requests
if channel.type == "bark":
import requests
import urllib.parse
base_url = channel.config.get("base_url", "").rstrip("/")
device_key = channel.config.get("device_key", "")
# 强制使用 HTTP因为你的 Bark 服务器 SSL 有问题)
if base_url.startswith("https://"):
base_url = base_url.replace("https://", "http://")
elif not base_url.startswith("http://"):
base_url = "http://" + base_url
# 构建 URL
encoded_body = urllib.parse.quote(str(body), safe='')
encoded_title = urllib.parse.quote(str(title), safe='') if title else ""
if encoded_title:
bark_url = f"{base_url}/{device_key}/{encoded_title}/{encoded_body}"
else:
bark_url = f"{base_url}/{device_key}/{encoded_body}"
# 添加参数
params = {}
if priority == "high":
params["level"] = "active"
elif priority == "urgent":
params["level"] = "critical"
try:
headers = {
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X)',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
}
response = requests.get(bark_url, params=params, timeout=10, headers=headers)
result = response.status_code == 200
print(f"Bark response: {response.status_code}, {response.text[:100]}")
except Exception as e:
result = False
print(f"Bark error: {e}")
else:
# 使用 apprise 发送通知
apobj = apprise.Apprise()
apobj.add(apprise_url)
# 构建消息
message = body
if title:
message = f"**{title}**\n\n{body}"
# 发送
result = apobj.notify(body=message)
if result:
await crud.update_notification_status(
db, notification.id, "sent", sent_at=datetime.utcnow()
)
return {
"channel": channel.name,
"channel_id": channel_id,
"status": "sent",
"notification_id": notification.id
}
else:
error_msg = "Failed to send notification"
await crud.update_notification_status(
db, notification.id, "failed", error_msg
)
return {
"channel": channel.name,
"channel_id": channel_id,
"status": "failed",
"notification_id": notification.id,
"error_msg": error_msg
}
except Exception as e:
error_msg = str(e)
await crud.update_notification_status(
db, notification.id, "failed", error_msg
)
return {
"channel": channel.name,
"channel_id": channel_id,
"status": "failed",
"notification_id": notification.id,
"error_msg": error_msg
}
async def send_to_channels(
self,
db: AsyncSession,
channels: Optional[List[str]],
tags: Optional[List[str]],
title: Optional[str],
body: str,
priority: str = "normal"
) -> List[Dict[str, Any]]:
"""批量发送通知到多个通道或按标签发送"""
results = []
target_channels = []
# 按名称获取通道
if channels:
for channel_name in channels:
channel = await crud.get_channel_by_name(db, channel_name)
if channel:
target_channels.append(channel)
# 按标签获取通道
if tags:
tagged_channels = await crud.get_channels_by_tags(db, tags)
# 合并去重
existing_ids = {c.id for c in target_channels}
for channel in tagged_channels:
if channel.id not in existing_ids:
target_channels.append(channel)
existing_ids.add(channel.id)
# 发送通知
for channel in target_channels:
result = await self.send_notification(
db, channel.id, title, body, priority
)
results.append(result)
return results