190 lines
6.3 KiB
Python
190 lines
6.3 KiB
Python
from typing import List, Optional, Dict, Any
|
|
from datetime import datetime
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
import apprise
|
|
from backend import crud
|
|
from backend.models import Channel
|
|
|
|
class NotifyService:
|
|
def __init__(self):
|
|
self.apobj = apprise.Apprise()
|
|
|
|
def _build_apprise_url(self, channel: Channel) -> Optional[str]:
|
|
"""根据通道类型构建 apprise URL"""
|
|
config = channel.config or {}
|
|
|
|
if channel.type == "discord":
|
|
webhook_url = config.get("webhook_url", "")
|
|
if webhook_url:
|
|
return f"{webhook_url}"
|
|
|
|
elif channel.type == "telegram":
|
|
bot_token = config.get("bot_token", "")
|
|
chat_id = config.get("chat_id", "")
|
|
if bot_token and chat_id:
|
|
return f"tgram://{bot_token}/{chat_id}"
|
|
|
|
elif channel.type == "email":
|
|
smtp_host = config.get("smtp_host", "")
|
|
smtp_port = config.get("smtp_port", 587)
|
|
username = config.get("username", "")
|
|
password = config.get("password", "")
|
|
to_email = config.get("to_email", "")
|
|
|
|
if username and password and to_email:
|
|
return f"mailtos://{username}:{password}@{smtp_host}:{smtp_port}?to={to_email}"
|
|
|
|
elif channel.type == "slack":
|
|
webhook_url = config.get("webhook_url", "")
|
|
if webhook_url:
|
|
return f"{webhook_url}"
|
|
|
|
elif channel.type == "webhook":
|
|
url = config.get("url", "")
|
|
if url:
|
|
return f"json://{url.replace('https://', '').replace('http://', '')}"
|
|
|
|
# 支持 apprise 原生 URL 格式
|
|
elif channel.type == "apprise":
|
|
return config.get("url", "")
|
|
|
|
return None
|
|
|
|
async def send_notification(
|
|
self,
|
|
db: AsyncSession,
|
|
channel_id: int,
|
|
title: Optional[str],
|
|
body: str,
|
|
priority: str = "normal"
|
|
) -> Dict[str, Any]:
|
|
"""发送单条通知到指定通道"""
|
|
channel = await crud.get_channel(db, channel_id)
|
|
if not channel:
|
|
return {
|
|
"channel_id": channel_id,
|
|
"status": "failed",
|
|
"error_msg": "Channel not found"
|
|
}
|
|
|
|
if not channel.is_active:
|
|
return {
|
|
"channel": channel.name,
|
|
"channel_id": channel_id,
|
|
"status": "skipped",
|
|
"error_msg": "Channel is inactive"
|
|
}
|
|
|
|
# 创建通知记录
|
|
notification = await crud.create_notification(
|
|
db, channel_id, {
|
|
"title": title,
|
|
"body": body,
|
|
"priority": priority,
|
|
"status": "pending"
|
|
}
|
|
)
|
|
|
|
# 构建 apprise URL
|
|
apprise_url = self._build_apprise_url(channel)
|
|
if not apprise_url:
|
|
error_msg = f"Invalid configuration for channel type: {channel.type}"
|
|
await crud.update_notification_status(
|
|
db, notification.id, "failed", error_msg
|
|
)
|
|
return {
|
|
"channel": channel.name,
|
|
"channel_id": channel_id,
|
|
"status": "failed",
|
|
"notification_id": notification.id,
|
|
"error_msg": error_msg
|
|
}
|
|
|
|
try:
|
|
# 发送通知
|
|
apobj = apprise.Apprise()
|
|
apobj.add(apprise_url)
|
|
|
|
# 构建消息
|
|
message = body
|
|
if title:
|
|
message = f"**{title}**\n\n{body}"
|
|
|
|
# 发送
|
|
result = apobj.notify(body=message)
|
|
|
|
if result:
|
|
await crud.update_notification_status(
|
|
db, notification.id, "sent", sent_at=datetime.utcnow()
|
|
)
|
|
return {
|
|
"channel": channel.name,
|
|
"channel_id": channel_id,
|
|
"status": "sent",
|
|
"notification_id": notification.id
|
|
}
|
|
else:
|
|
error_msg = "Failed to send notification"
|
|
await crud.update_notification_status(
|
|
db, notification.id, "failed", error_msg
|
|
)
|
|
return {
|
|
"channel": channel.name,
|
|
"channel_id": channel_id,
|
|
"status": "failed",
|
|
"notification_id": notification.id,
|
|
"error_msg": error_msg
|
|
}
|
|
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
await crud.update_notification_status(
|
|
db, notification.id, "failed", error_msg
|
|
)
|
|
return {
|
|
"channel": channel.name,
|
|
"channel_id": channel_id,
|
|
"status": "failed",
|
|
"notification_id": notification.id,
|
|
"error_msg": error_msg
|
|
}
|
|
|
|
async def send_to_channels(
|
|
self,
|
|
db: AsyncSession,
|
|
channels: Optional[List[str]],
|
|
tags: Optional[List[str]],
|
|
title: Optional[str],
|
|
body: str,
|
|
priority: str = "normal"
|
|
) -> List[Dict[str, Any]]:
|
|
"""批量发送通知到多个通道或按标签发送"""
|
|
results = []
|
|
target_channels = []
|
|
|
|
# 按名称获取通道
|
|
if channels:
|
|
for channel_name in channels:
|
|
channel = await crud.get_channel_by_name(db, channel_name)
|
|
if channel:
|
|
target_channels.append(channel)
|
|
|
|
# 按标签获取通道
|
|
if tags:
|
|
tagged_channels = await crud.get_channels_by_tags(db, tags)
|
|
# 合并去重
|
|
existing_ids = {c.id for c in target_channels}
|
|
for channel in tagged_channels:
|
|
if channel.id not in existing_ids:
|
|
target_channels.append(channel)
|
|
existing_ids.add(channel.id)
|
|
|
|
# 发送通知
|
|
for channel in target_channels:
|
|
result = await self.send_notification(
|
|
db, channel.id, title, body, priority
|
|
)
|
|
results.append(result)
|
|
|
|
return results
|