241 lines
8.7 KiB
Python
241 lines
8.7 KiB
Python
from typing import List, Optional, Dict, Any
|
||
from datetime import datetime
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
import apprise
|
||
from backend import crud
|
||
from backend.models import Channel
|
||
|
||
class NotifyService:
|
||
def __init__(self):
|
||
self.apobj = apprise.Apprise()
|
||
|
||
def _build_apprise_url(self, channel: Channel) -> Optional[str]:
|
||
"""根据通道类型构建 apprise URL"""
|
||
config = channel.config or {}
|
||
|
||
if channel.type == "discord":
|
||
webhook_url = config.get("webhook_url", "")
|
||
if webhook_url:
|
||
return f"{webhook_url}"
|
||
|
||
elif channel.type == "telegram":
|
||
bot_token = config.get("bot_token", "")
|
||
chat_id = config.get("chat_id", "")
|
||
if bot_token and chat_id:
|
||
return f"tgram://{bot_token}/{chat_id}"
|
||
|
||
elif channel.type == "email":
|
||
smtp_host = config.get("smtp_host", "")
|
||
smtp_port = config.get("smtp_port", 587)
|
||
username = config.get("username", "")
|
||
password = config.get("password", "")
|
||
to_email = config.get("to_email", "")
|
||
|
||
if username and password and to_email:
|
||
return f"mailtos://{username}:{password}@{smtp_host}:{smtp_port}?to={to_email}"
|
||
|
||
elif channel.type == "slack":
|
||
webhook_url = config.get("webhook_url", "")
|
||
if webhook_url:
|
||
return f"{webhook_url}"
|
||
|
||
elif channel.type == "webhook":
|
||
url = config.get("url", "")
|
||
if url:
|
||
return f"json://{url.replace('https://', '').replace('http://', '')}"
|
||
|
||
# 支持 apprise 原生 URL 格式
|
||
elif channel.type == "apprise":
|
||
return config.get("url", "")
|
||
|
||
# Bark iOS 推送 - 在 send_notification 中直接处理
|
||
elif channel.type == "bark":
|
||
base_url = config.get("base_url", "")
|
||
device_key = config.get("device_key", "")
|
||
if base_url and device_key:
|
||
# 返回特殊标记,实际发送在 send_notification 中处理
|
||
return "__bark__"
|
||
|
||
return None
|
||
|
||
async def send_notification(
|
||
self,
|
||
db: AsyncSession,
|
||
channel_id: int,
|
||
title: Optional[str],
|
||
body: str,
|
||
priority: str = "normal"
|
||
) -> Dict[str, Any]:
|
||
"""发送单条通知到指定通道"""
|
||
channel = await crud.get_channel(db, channel_id)
|
||
if not channel:
|
||
return {
|
||
"channel_id": channel_id,
|
||
"status": "failed",
|
||
"error_msg": "Channel not found"
|
||
}
|
||
|
||
if not channel.is_active:
|
||
return {
|
||
"channel": channel.name,
|
||
"channel_id": channel_id,
|
||
"status": "skipped",
|
||
"error_msg": "Channel is inactive"
|
||
}
|
||
|
||
# 创建通知记录
|
||
notification = await crud.create_notification(
|
||
db, channel_id, {
|
||
"title": title,
|
||
"body": body,
|
||
"priority": priority,
|
||
"status": "pending"
|
||
}
|
||
)
|
||
|
||
# 构建 apprise URL
|
||
apprise_url = self._build_apprise_url(channel)
|
||
if not apprise_url:
|
||
error_msg = f"Invalid configuration for channel type: {channel.type}"
|
||
await crud.update_notification_status(
|
||
db, notification.id, "failed", error_msg
|
||
)
|
||
return {
|
||
"channel": channel.name,
|
||
"channel_id": channel_id,
|
||
"status": "failed",
|
||
"notification_id": notification.id,
|
||
"error_msg": error_msg
|
||
}
|
||
|
||
try:
|
||
# Bark 特殊处理 - 使用 requests
|
||
if channel.type == "bark":
|
||
import requests
|
||
import urllib.parse
|
||
|
||
base_url = channel.config.get("base_url", "").rstrip("/")
|
||
device_key = channel.config.get("device_key", "")
|
||
|
||
# 强制使用 HTTP(因为你的 Bark 服务器 SSL 有问题)
|
||
if base_url.startswith("https://"):
|
||
base_url = base_url.replace("https://", "http://")
|
||
elif not base_url.startswith("http://"):
|
||
base_url = "http://" + base_url
|
||
|
||
# 构建 URL
|
||
encoded_body = urllib.parse.quote(str(body), safe='')
|
||
encoded_title = urllib.parse.quote(str(title), safe='') if title else ""
|
||
|
||
if encoded_title:
|
||
bark_url = f"{base_url}/{device_key}/{encoded_title}/{encoded_body}"
|
||
else:
|
||
bark_url = f"{base_url}/{device_key}/{encoded_body}"
|
||
|
||
# 添加参数
|
||
params = {}
|
||
if priority == "high":
|
||
params["level"] = "active"
|
||
elif priority == "urgent":
|
||
params["level"] = "critical"
|
||
|
||
try:
|
||
headers = {
|
||
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X)',
|
||
'Accept': 'application/json, text/plain, */*',
|
||
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
|
||
}
|
||
response = requests.get(bark_url, params=params, timeout=10, headers=headers)
|
||
result = response.status_code == 200
|
||
print(f"Bark response: {response.status_code}, {response.text[:100]}")
|
||
except Exception as e:
|
||
result = False
|
||
print(f"Bark error: {e}")
|
||
else:
|
||
# 使用 apprise 发送通知
|
||
apobj = apprise.Apprise()
|
||
apobj.add(apprise_url)
|
||
|
||
# 构建消息
|
||
message = body
|
||
if title:
|
||
message = f"**{title}**\n\n{body}"
|
||
|
||
# 发送
|
||
result = apobj.notify(body=message)
|
||
|
||
if result:
|
||
await crud.update_notification_status(
|
||
db, notification.id, "sent", sent_at=datetime.utcnow()
|
||
)
|
||
return {
|
||
"channel": channel.name,
|
||
"channel_id": channel_id,
|
||
"status": "sent",
|
||
"notification_id": notification.id
|
||
}
|
||
else:
|
||
error_msg = "Failed to send notification"
|
||
await crud.update_notification_status(
|
||
db, notification.id, "failed", error_msg
|
||
)
|
||
return {
|
||
"channel": channel.name,
|
||
"channel_id": channel_id,
|
||
"status": "failed",
|
||
"notification_id": notification.id,
|
||
"error_msg": error_msg
|
||
}
|
||
|
||
except Exception as e:
|
||
error_msg = str(e)
|
||
await crud.update_notification_status(
|
||
db, notification.id, "failed", error_msg
|
||
)
|
||
return {
|
||
"channel": channel.name,
|
||
"channel_id": channel_id,
|
||
"status": "failed",
|
||
"notification_id": notification.id,
|
||
"error_msg": error_msg
|
||
}
|
||
|
||
async def send_to_channels(
|
||
self,
|
||
db: AsyncSession,
|
||
channels: Optional[List[str]],
|
||
tags: Optional[List[str]],
|
||
title: Optional[str],
|
||
body: str,
|
||
priority: str = "normal"
|
||
) -> List[Dict[str, Any]]:
|
||
"""批量发送通知到多个通道或按标签发送"""
|
||
results = []
|
||
target_channels = []
|
||
|
||
# 按名称获取通道
|
||
if channels:
|
||
for channel_name in channels:
|
||
channel = await crud.get_channel_by_name(db, channel_name)
|
||
if channel:
|
||
target_channels.append(channel)
|
||
|
||
# 按标签获取通道
|
||
if tags:
|
||
tagged_channels = await crud.get_channels_by_tags(db, tags)
|
||
# 合并去重
|
||
existing_ids = {c.id for c in target_channels}
|
||
for channel in tagged_channels:
|
||
if channel.id not in existing_ids:
|
||
target_channels.append(channel)
|
||
existing_ids.add(channel.id)
|
||
|
||
# 发送通知
|
||
for channel in target_channels:
|
||
result = await self.send_notification(
|
||
db, channel.id, title, body, priority
|
||
)
|
||
results.append(result)
|
||
|
||
return results
|