from typing import List, Optional, Dict, Any from datetime import datetime from sqlalchemy.ext.asyncio import AsyncSession import apprise from backend import crud from backend.models import Channel class NotifyService: def __init__(self): self.apobj = apprise.Apprise() def _build_apprise_url(self, channel: Channel) -> Optional[str]: """根据通道类型构建 apprise URL""" config = channel.config or {} if channel.type == "discord": webhook_url = config.get("webhook_url", "") if webhook_url: return f"{webhook_url}" elif channel.type == "telegram": bot_token = config.get("bot_token", "") chat_id = config.get("chat_id", "") if bot_token and chat_id: return f"tgram://{bot_token}/{chat_id}" elif channel.type == "email": smtp_host = config.get("smtp_host", "") smtp_port = config.get("smtp_port", 587) username = config.get("username", "") password = config.get("password", "") to_email = config.get("to_email", "") if username and password and to_email: return f"mailtos://{username}:{password}@{smtp_host}:{smtp_port}?to={to_email}" elif channel.type == "slack": webhook_url = config.get("webhook_url", "") if webhook_url: return f"{webhook_url}" elif channel.type == "webhook": url = config.get("url", "") if url: return f"json://{url.replace('https://', '').replace('http://', '')}" # 支持 apprise 原生 URL 格式 elif channel.type == "apprise": return config.get("url", "") # Bark iOS 推送 - 在 send_notification 中直接处理 elif channel.type == "bark": base_url = config.get("base_url", "") device_key = config.get("device_key", "") if base_url and device_key: # 返回特殊标记,实际发送在 send_notification 中处理 return "__bark__" return None async def send_notification( self, db: AsyncSession, channel_id: int, title: Optional[str], body: str, priority: str = "normal" ) -> Dict[str, Any]: """发送单条通知到指定通道""" channel = await crud.get_channel(db, channel_id) if not channel: return { "channel_id": channel_id, "status": "failed", "error_msg": "Channel not found" } if not channel.is_active: return { "channel": channel.name, "channel_id": channel_id, "status": "skipped", "error_msg": "Channel is inactive" } # 创建通知记录 notification = await crud.create_notification( db, channel_id, { "title": title, "body": body, "priority": priority, "status": "pending" } ) # 构建 apprise URL apprise_url = self._build_apprise_url(channel) if not apprise_url: error_msg = f"Invalid configuration for channel type: {channel.type}" await crud.update_notification_status( db, notification.id, "failed", error_msg ) return { "channel": channel.name, "channel_id": channel_id, "status": "failed", "notification_id": notification.id, "error_msg": error_msg } try: # Bark 特殊处理 - 使用 requests if channel.type == "bark": import requests import urllib.parse base_url = channel.config.get("base_url", "").rstrip("/") device_key = channel.config.get("device_key", "") # 强制使用 HTTP(因为你的 Bark 服务器 SSL 有问题) if base_url.startswith("https://"): base_url = base_url.replace("https://", "http://") elif not base_url.startswith("http://"): base_url = "http://" + base_url # 构建 URL encoded_body = urllib.parse.quote(str(body), safe='') encoded_title = urllib.parse.quote(str(title), safe='') if title else "" if encoded_title: bark_url = f"{base_url}/{device_key}/{encoded_title}/{encoded_body}" else: bark_url = f"{base_url}/{device_key}/{encoded_body}" # 添加参数 params = {} if priority == "high": params["level"] = "active" elif priority == "urgent": params["level"] = "critical" try: headers = { 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X)', 'Accept': 'application/json, text/plain, */*', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', } response = requests.get(bark_url, params=params, timeout=10, headers=headers) result = response.status_code == 200 print(f"Bark response: {response.status_code}, {response.text[:100]}") except Exception as e: result = False print(f"Bark error: {e}") else: # 使用 apprise 发送通知 apobj = apprise.Apprise() apobj.add(apprise_url) # 构建消息 message = body if title: message = f"**{title}**\n\n{body}" # 发送 result = apobj.notify(body=message) if result: await crud.update_notification_status( db, notification.id, "sent", sent_at=datetime.utcnow() ) return { "channel": channel.name, "channel_id": channel_id, "status": "sent", "notification_id": notification.id } else: error_msg = "Failed to send notification" await crud.update_notification_status( db, notification.id, "failed", error_msg ) return { "channel": channel.name, "channel_id": channel_id, "status": "failed", "notification_id": notification.id, "error_msg": error_msg } except Exception as e: error_msg = str(e) await crud.update_notification_status( db, notification.id, "failed", error_msg ) return { "channel": channel.name, "channel_id": channel_id, "status": "failed", "notification_id": notification.id, "error_msg": error_msg } async def send_to_channels( self, db: AsyncSession, channels: Optional[List[str]], tags: Optional[List[str]], title: Optional[str], body: str, priority: str = "normal" ) -> List[Dict[str, Any]]: """批量发送通知到多个通道或按标签发送""" results = [] target_channels = [] # 按名称获取通道 if channels: for channel_name in channels: channel = await crud.get_channel_by_name(db, channel_name) if channel: target_channels.append(channel) # 按标签获取通道 if tags: tagged_channels = await crud.get_channels_by_tags(db, tags) # 合并去重 existing_ids = {c.id for c in target_channels} for channel in tagged_channels: if channel.id not in existing_ids: target_channels.append(channel) existing_ids.add(channel.id) # 发送通知 for channel in target_channels: result = await self.send_notification( db, channel.id, title, body, priority ) results.append(result) return results