apprise-notify-center/backend/notify_service.py

190 lines
6.3 KiB
Python

from typing import List, Optional, Dict, Any
from datetime import datetime
from sqlalchemy.ext.asyncio import AsyncSession
import apprise
from backend import crud
from backend.models import Channel
class NotifyService:
def __init__(self):
self.apobj = apprise.Apprise()
def _build_apprise_url(self, channel: Channel) -> Optional[str]:
"""根据通道类型构建 apprise URL"""
config = channel.config or {}
if channel.type == "discord":
webhook_url = config.get("webhook_url", "")
if webhook_url:
return f"{webhook_url}"
elif channel.type == "telegram":
bot_token = config.get("bot_token", "")
chat_id = config.get("chat_id", "")
if bot_token and chat_id:
return f"tgram://{bot_token}/{chat_id}"
elif channel.type == "email":
smtp_host = config.get("smtp_host", "")
smtp_port = config.get("smtp_port", 587)
username = config.get("username", "")
password = config.get("password", "")
to_email = config.get("to_email", "")
if username and password and to_email:
return f"mailtos://{username}:{password}@{smtp_host}:{smtp_port}?to={to_email}"
elif channel.type == "slack":
webhook_url = config.get("webhook_url", "")
if webhook_url:
return f"{webhook_url}"
elif channel.type == "webhook":
url = config.get("url", "")
if url:
return f"json://{url.replace('https://', '').replace('http://', '')}"
# 支持 apprise 原生 URL 格式
elif channel.type == "apprise":
return config.get("url", "")
return None
async def send_notification(
self,
db: AsyncSession,
channel_id: int,
title: Optional[str],
body: str,
priority: str = "normal"
) -> Dict[str, Any]:
"""发送单条通知到指定通道"""
channel = await crud.get_channel(db, channel_id)
if not channel:
return {
"channel_id": channel_id,
"status": "failed",
"error_msg": "Channel not found"
}
if not channel.is_active:
return {
"channel": channel.name,
"channel_id": channel_id,
"status": "skipped",
"error_msg": "Channel is inactive"
}
# 创建通知记录
notification = await crud.create_notification(
db, channel_id, {
"title": title,
"body": body,
"priority": priority,
"status": "pending"
}
)
# 构建 apprise URL
apprise_url = self._build_apprise_url(channel)
if not apprise_url:
error_msg = f"Invalid configuration for channel type: {channel.type}"
await crud.update_notification_status(
db, notification.id, "failed", error_msg
)
return {
"channel": channel.name,
"channel_id": channel_id,
"status": "failed",
"notification_id": notification.id,
"error_msg": error_msg
}
try:
# 发送通知
apobj = apprise.Apprise()
apobj.add(apprise_url)
# 构建消息
message = body
if title:
message = f"**{title}**\n\n{body}"
# 发送
result = apobj.notify(body=message)
if result:
await crud.update_notification_status(
db, notification.id, "sent", sent_at=datetime.utcnow()
)
return {
"channel": channel.name,
"channel_id": channel_id,
"status": "sent",
"notification_id": notification.id
}
else:
error_msg = "Failed to send notification"
await crud.update_notification_status(
db, notification.id, "failed", error_msg
)
return {
"channel": channel.name,
"channel_id": channel_id,
"status": "failed",
"notification_id": notification.id,
"error_msg": error_msg
}
except Exception as e:
error_msg = str(e)
await crud.update_notification_status(
db, notification.id, "failed", error_msg
)
return {
"channel": channel.name,
"channel_id": channel_id,
"status": "failed",
"notification_id": notification.id,
"error_msg": error_msg
}
async def send_to_channels(
self,
db: AsyncSession,
channels: Optional[List[str]],
tags: Optional[List[str]],
title: Optional[str],
body: str,
priority: str = "normal"
) -> List[Dict[str, Any]]:
"""批量发送通知到多个通道或按标签发送"""
results = []
target_channels = []
# 按名称获取通道
if channels:
for channel_name in channels:
channel = await crud.get_channel_by_name(db, channel_name)
if channel:
target_channels.append(channel)
# 按标签获取通道
if tags:
tagged_channels = await crud.get_channels_by_tags(db, tags)
# 合并去重
existing_ids = {c.id for c in target_channels}
for channel in tagged_channels:
if channel.id not in existing_ids:
target_channels.append(channel)
existing_ids.add(channel.id)
# 发送通知
for channel in target_channels:
result = await self.send_notification(
db, channel.id, title, body, priority
)
results.append(result)
return results