From 53bc48fee64776ead4c08727e139bdc93690c087 Mon Sep 17 00:00:00 2001 From: OpenClaw Agent Date: Sat, 7 Feb 2026 17:30:04 +0000 Subject: [PATCH] feat: add notification service with apprise integration and tests --- backend/__pycache__/crud.cpython-310.pyc | Bin 0 -> 4267 bytes .../notify_service.cpython-310.pyc | Bin 0 -> 3962 bytes backend/crud.py | 20 +- backend/notify_service.py | 189 ++++++++++++++++++ ...otify_service.cpython-310-pytest-9.0.2.pyc | Bin 0 -> 5075 bytes backend/tests/test_notify_service.py | 106 ++++++++++ 6 files changed, 305 insertions(+), 10 deletions(-) create mode 100644 backend/__pycache__/crud.cpython-310.pyc create mode 100644 backend/__pycache__/notify_service.cpython-310.pyc create mode 100644 backend/notify_service.py create mode 100644 backend/tests/__pycache__/test_notify_service.cpython-310-pytest-9.0.2.pyc create mode 100644 backend/tests/test_notify_service.py diff --git a/backend/__pycache__/crud.cpython-310.pyc b/backend/__pycache__/crud.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..260515fce137a3a56c4e4578ed968aab0fad583e GIT binary patch literal 4267 zcmaJ^%WoUU8Q+pnx7~6ulK_0Rz3Hf5luIJMlTUBu)DJW=U!(t6gG;JM(?B z^F4mwV>^?RWee9kw;!~AHf>q|rb+&)VsZs{{4Zu%!V)aB+N{CayusUc!)`kbhtWC@ z-L}{8%(ER9+Qmk(U22rtEo;)*#)Eb;Cc@wgCQVNPTX! zkC}ZH@fC3a`_35dqLE;l^xJgAS4ADCpB2m3t@@>Bj!ex z?{-xymX{*ZbLUbZBb>at;RiQmBvu2}6D!>vJ+UQ|=0I^0>l4zD?=7<;Qwwyg6ZviV zKkc9pkd~UkQ1?`go=3W=S(=`v;RMkKx|oG|S|v5I;S`?qY)+Eq#!i#YB^#+l8qQ#N zCWz08Z??MH4O{J25|B@ZmxBivH~A-C!7%Qh6n^PDY2H6}YLkEZ9{DKKqHshD?R7Rc zW0|NI@Iqb4d=zWj55u~Xd1na}LsrMBRmrIpPmySp&G??fYOMTa#a8F(#ZKaH#bZ_` zP3Ydo9sdHuJ(gH0vYX()=fIau_}a2|?UY}$-a5VOq|OZ%&)urxpLb7fSr4tmeqe1f z;RyGBF=f}ReYVe39Xf;efH4caHloezUBc@>e*5oF{_w{qzx(x*cmDF>H80#{_XLT-~Rd8-h0pPz1#X2hOaZD^?HD`JGv-fK(oGuJ0`#0 zgV1jkc3H}PW8JoXZsAxihUPZzM^y5aSF{ZiJVL+@qmV3;gGXs?OGff;S6wE1$K-6j zH*fvwLf*cF8``(B7IqL#@#U4#uDY99y3~rBevq_oOZ5`o0Z&Fd+C#7)D!^mn%@8VZ z@h~!C<62!9$d-w90o#>LvFG9jL^(r;66XFvYcaPz{D^IVb0-WzMh%d$R6@Xr^q5XpAQb8pxXLzyhkPzvR2luA92T>H%yCPwmu6-F+J%nt|5# znAT2e=d=d0vxs)HJm$6%*^UvxGVzl{sc%9AMJPq$8YNjpx~p$v(DqHaqutwn*pts0 zpx1@FMh2*Zec8cRTubg9b8QO8j!c2V0aGxTV&LSQb91g1 za^$I2itXv@KTL7s6tZj6Gi;e~C5!XR0Q}u?ZC48xG&~WItLHafNeGeo( z1^Z5 zGl)ay|JNV@+Ejpcz0^~yDNnto2Z(#k0$}bni`ZQngLpahauD~15TD2uEn_KDdgmaH zDZT0{J^cU!!9Plxev&g|A%p)F;u$fjS7}PwOIPw%IBL?xK`7*Tnwgc}z-ChcknIXo z>x#&Np7{-O#B1Pq&VVBrMW7Ol5>zeZ09#GLSUh1jkSD0i!kVKN25YEjgEbfwJYK(W zU>P2_kJ^XIU!t6dJyyS*xVr~ipi&ZuUK$QYMDli z(p_QdCm@U!Vge=s(j=J#5BC`L4)krbI|#wVPMjNV%K9Gu3kOI){UWU#z+Qu#%!MtS z`;?IZMCu@rrBQ7XJGbkQ8(!#_53JM{?ixnwE!Gvo*=WR_&0Z9YNk|EVynP+F$)Big zXcdt2sk$L;yvuJP!5u;l8SQ288kA}`==7rGkeF5C@|wUG0t#FA@-DrWV?dK|@u)Qo zM)PgLXq$GEPU46430;nFg?<=p%l3}?DV=K^Yx-LQ4ihY|6M?*k>GMn=(PC z5Qg|9F+KgTuP|>hLFH16#A=;(AL)bVkC*i}SO@N}Fo)OBP?W!@IcMN}hpRV1t#e~* zb=EwuucJRD+nYINOWI3zy7*d9WE4f#S7d%gNYV3$%C5+}$%;?kR;`Yre5NRrv#2Q2 zQ;9ZIa~nFUt*EQj)xqb*O1p#N5}Rg2k+HPhg0oDGvSU==GxVPGG*BC4x&=iANIM&y zPH1Y5q9m*+M;LG$6?3*a>_>`gIIS1T%cO057sDy5!sbwdr`aN3be5eZZ)uVjIOAV( x&c7(z{6C&k{(OFhm-A~JYkZ0`=d+`Kj9+;He-WyB_wLr`2r0Ab zn%eHF>gwvQU)Ln%vTMNeQRhnZ%s#{TJN_AcO!#vGS~>@W8=S;OlQc+^HfYmqm`&DT z1p8@RXcig;&6}~+v>UeOS?n|%7%#-drrU6JpB0yyUc)2C%Lcc(vubeXb=oL%Hfz+2 zcVHf)&J=q!N;Bo0X=hQ=3Sz}hMPa7Q7g{TIq8uJ%B8!>=c<+UDr4`PKGzGyK=wPAj zaCqA%7lT$y#35Q7eX8*11hn+GK!njC+-OivI9)XwCN~A+OcaE*&zNVtz^zr*u!}a| z#U0`Bq9}5AFv?59;vO_6Tw#gQJOvGpmw9Ei(D1f2#z4A^*7g~Ee1w18N_pE3SgP;| zUIj}PJ_&Kzt;#b=7R|5B3V9(4MM%K+%?Ei3?Ig6c0wgo~WX;G8pK=3~$xPi;cF=Ci zC>6?nVaODMb~3kw5lclp|2spDf$F_8(~C({Ooxk6YhijSX^U1E2g}pTi9DCKgHTKl z?Hq1te-4MDl?gdL7lh|Ti%$=F{dC~wRC`4^z8|%s%=e4v9{O1!-93-2W@=MJ+2LT* zlqE&hk*yHrHy;leJg$134On$fmW(!wj2;D!aB_~yCvw`SoN{xG_RQRryL0n8#DE`| zMQOqr$ULWI3R;F~-qL&_XY$#+AfM%S&aT6;%nR3zn{X_*w2PoEwT7!TEX|j=2ea9l zFYA6s^A*h(H9w|#H+N(^FG6!=n~&!X%rcP|fmU-%^ES}Q+=Y4Fk$HE)ysO&I_R4;= zV`aNRrmto8=($RHN#5d5S z=hC%1m*2Vlr;pcf{bb|T>iWkY)rNtr}|bb9(8nN|hS45C;S(`MH87Zb?*0ozVw7QF&zKsg;a zQmvpVl+zB<6pkZ@2@+o`c3o7@l2n^JdXJpwvgfnpTdI)8a4wZSbWEAxS=U}llNLyJ zoy9C`qfXaG4j5U2Mb7~ISYgtdj}{cmR@y?D3nEjt6m763>Yl_LlL#a7OGtJj*^2}q z)Q|^}Jc;BfBu^u$A^9Q_OtJwP2ToP}xlR;we;7~<9Uk-a6cCrX)FX^ogw@=?dBh_Q z87FMN`%mPDKl*vX#@&An zZkUjV))Gyh1ZqMr04%p}K#qYPL%rP0Szh1(wKv$BIkaKtM!%3-YozDoPI`vBpbhzQ zllF?BXM>)-+n%deBqX2UXm08#Iie^`e3^?R8t8}-S~=U|7{-~QR1OPANL{$k_i z8szltU;SqNjrTX+y0ZS>wLyx{C~F=PN$~E(@batyp4aA+PK(z~c^GUcE6sweld7VS zGLe2WT~KyIb)UZ0Q-$fy#)gFkdj6{FN2tXwgT zSCgR>P_Fo+n(`2?^chs%-)Unt^T@D2j4=i85s@T9f-(cnFSSRnSysiD zE`~x^O3F+#S+5Kb>Z_=#bx4&)07yqnmQl!VLSYpw2)vKePMj$sb|TmW z#^n+BzX_V@Pl3=&)gmL5QbLLK(6X53fPcUo=pX*r1NRfxWdLZ5xWxU0F&x2h>ppcH z>Jnx$06x!Tl#vSYwj`Vf%-*Tv+^;y!W0tD-olEq7_vx9@B5LX}&y_RW#uSi0SV%(~ zR{ol^!RJwT7P2D(cu_bw3m;1YCd7kghp@@N)eL4YZdsxrZzH)uPnea}GyyyGpJ3%}ckrU`D*76efUYZPvWFivgHu%*{+oihh&UaD0G Lis async def get_channels_by_tags(db: AsyncSession, tags: List[str]) -> List[Channel]: """获取包含任一标签的所有活动通道""" - conditions = [Channel.tags.contains([tag]) for tag in tags] - result = await db.execute( - select(Channel).where( - and_( - Channel.is_active == True, - or_(*conditions) - ) - ) - ) - return result.scalars().all() + # SQLite JSON 查询:使用 json_extract 或 like 匹配 + all_channels = await get_channels(db, limit=1000) + result = [] + for channel in all_channels: + if not channel.is_active: + continue + channel_tags = channel.tags or [] + if any(tag in channel_tags for tag in tags): + result.append(channel) + return result async def create_channel(db: AsyncSession, channel_data: Dict[str, Any]) -> Channel: db_channel = Channel(**channel_data) diff --git a/backend/notify_service.py b/backend/notify_service.py new file mode 100644 index 0000000..e096ca7 --- /dev/null +++ b/backend/notify_service.py @@ -0,0 +1,189 @@ +from typing import List, Optional, Dict, Any +from datetime import datetime +from sqlalchemy.ext.asyncio import AsyncSession +import apprise +from backend import crud +from backend.models import Channel + +class NotifyService: + def __init__(self): + self.apobj = apprise.Apprise() + + def _build_apprise_url(self, channel: Channel) -> Optional[str]: + """根据通道类型构建 apprise URL""" + config = channel.config or {} + + if channel.type == "discord": + webhook_url = config.get("webhook_url", "") + if webhook_url: + return f"{webhook_url}" + + elif channel.type == "telegram": + bot_token = config.get("bot_token", "") + chat_id = config.get("chat_id", "") + if bot_token and chat_id: + return f"tgram://{bot_token}/{chat_id}" + + elif channel.type == "email": + smtp_host = config.get("smtp_host", "") + smtp_port = config.get("smtp_port", 587) + username = config.get("username", "") + password = config.get("password", "") + to_email = config.get("to_email", "") + + if username and password and to_email: + return f"mailtos://{username}:{password}@{smtp_host}:{smtp_port}?to={to_email}" + + elif channel.type == "slack": + webhook_url = config.get("webhook_url", "") + if webhook_url: + return f"{webhook_url}" + + elif channel.type == "webhook": + url = config.get("url", "") + if url: + return f"json://{url.replace('https://', '').replace('http://', '')}" + + # 支持 apprise 原生 URL 格式 + elif channel.type == "apprise": + return config.get("url", "") + + return None + + async def send_notification( + self, + db: AsyncSession, + channel_id: int, + title: Optional[str], + body: str, + priority: str = "normal" + ) -> Dict[str, Any]: + """发送单条通知到指定通道""" + channel = await crud.get_channel(db, channel_id) + if not channel: + return { + "channel_id": channel_id, + "status": "failed", + "error_msg": "Channel not found" + } + + if not channel.is_active: + return { + "channel": channel.name, + "channel_id": channel_id, + "status": "skipped", + "error_msg": "Channel is inactive" + } + + # 创建通知记录 + notification = await crud.create_notification( + db, channel_id, { + "title": title, + "body": body, + "priority": priority, + "status": "pending" + } + ) + + # 构建 apprise URL + apprise_url = self._build_apprise_url(channel) + if not apprise_url: + error_msg = f"Invalid configuration for channel type: {channel.type}" + await crud.update_notification_status( + db, notification.id, "failed", error_msg + ) + return { + "channel": channel.name, + "channel_id": channel_id, + "status": "failed", + "notification_id": notification.id, + "error_msg": error_msg + } + + try: + # 发送通知 + apobj = apprise.Apprise() + apobj.add(apprise_url) + + # 构建消息 + message = body + if title: + message = f"**{title}**\n\n{body}" + + # 发送 + result = apobj.notify(body=message) + + if result: + await crud.update_notification_status( + db, notification.id, "sent", sent_at=datetime.utcnow() + ) + return { + "channel": channel.name, + "channel_id": channel_id, + "status": "sent", + "notification_id": notification.id + } + else: + error_msg = "Failed to send notification" + await crud.update_notification_status( + db, notification.id, "failed", error_msg + ) + return { + "channel": channel.name, + "channel_id": channel_id, + "status": "failed", + "notification_id": notification.id, + "error_msg": error_msg + } + + except Exception as e: + error_msg = str(e) + await crud.update_notification_status( + db, notification.id, "failed", error_msg + ) + return { + "channel": channel.name, + "channel_id": channel_id, + "status": "failed", + "notification_id": notification.id, + "error_msg": error_msg + } + + async def send_to_channels( + self, + db: AsyncSession, + channels: Optional[List[str]], + tags: Optional[List[str]], + title: Optional[str], + body: str, + priority: str = "normal" + ) -> List[Dict[str, Any]]: + """批量发送通知到多个通道或按标签发送""" + results = [] + target_channels = [] + + # 按名称获取通道 + if channels: + for channel_name in channels: + channel = await crud.get_channel_by_name(db, channel_name) + if channel: + target_channels.append(channel) + + # 按标签获取通道 + if tags: + tagged_channels = await crud.get_channels_by_tags(db, tags) + # 合并去重 + existing_ids = {c.id for c in target_channels} + for channel in tagged_channels: + if channel.id not in existing_ids: + target_channels.append(channel) + existing_ids.add(channel.id) + + # 发送通知 + for channel in target_channels: + result = await self.send_notification( + db, channel.id, title, body, priority + ) + results.append(result) + + return results diff --git a/backend/tests/__pycache__/test_notify_service.cpython-310-pytest-9.0.2.pyc b/backend/tests/__pycache__/test_notify_service.cpython-310-pytest-9.0.2.pyc new file mode 100644 index 0000000000000000000000000000000000000000..041cdee2bcafa8f892fd19c88200bf96606aa94f GIT binary patch literal 5075 zcmb7|TW}o36^5s0XJ_xL)y1-8gAoRU*TE}aWMhc!*jy5lfRdC6R5qceM%yE4rP&KT zv$mzortAp9B@jb}R0T&Vq!bC5Do9?aG8C0Oq$)3Y%~QYRA;_RSy<62 zM5;M+y8BG`nV#A_07enC$F7Gk~*En;vGE;ZG7SlDR z)r@kSg)L^;R4s{eLfw<)B<{MKs-?^6TBe*)b}=_w%awCVH{5)!uiRHFlnbgHcl&Dt zM$)YZeM;euR8E@KtWwyv<_N2E^ubgr`uiB`ixxcskaHC?oGEuEpeTz@Q+Ng*Y z$2~XM@T=1cryOy*T5&4Wcju2mN#VL*$DB<;Ij{TLlJ-^1&_kFlpDk$oen}M$@TRT>H%UnTDA6ns&t*x0_8-^_;zR^@e*Z zPThCJIQ8b#`MB?RzNe%W8dzR92c_nM)cF*xB`*u1pUdh6X0ie+{aavNX%(87&R-r% z64xJbc`u;lJmbvQR@kaGr*oahp4LaTUueJ7+su!x7=acrZtyratBfa>W2-tQMt?|a z>wbJmYsY+(CzsfTSK9_p1%{OfVk=2Jqt9WHa4d|~cw`?B;ygF1wN0LHCjv7_%6xRoz}byuRVH3h9zd%z zw3AmEc(Zjm1`o1)gpUSUyZ1>%aPWPsQ4go?d(BJ!#e(qK4gTdcN(qJZX4X$(Ptw{n)W0TO8Q6 zyScEx=#3paHb&}1(ZlACp^n7FV&3+!uzl5bxad*kVVSPX+V#5QT2(M$anE&K94dP0 z?^7L=v9Q;JMXyxl>9$NnPhy*Hqp3o72T2^d+B?wQ;Xrp62c^+!Rk@6loRG0u=d4K3 zbNPyJY~Qh>?gY)dAVSNmaQv32Tc>Te<;Z@#EDv{RwPO1~o7B-?`b2YqcCcj&*>8bu zt|gqNz!z!S!jUPcM zU81EYM=Wo)(Q>)!+M012Q?_fVZ#xi9sv6kgR-;~Vig^VGnQ7IlPqrLoCw9;a0bB~M zGE)PvBicw)I9|*3Wrl_hyW7`WIM`b_5Eequ5A_!A?lqg}Z8_XqIHH&t3R-b;NX?#_ zT)$yOE4m*o-cAVp+(^vSb*6)R`G1*aS~r=&vMl|dIpZfR&367%dEVe2=BywLsL>^f-6p=^ywl0wiu;0AA%y)Fh{u7Pu4`A#+(|Dv|>>X-sa97>482c8*-<%(g*UQ{YJG%*I|8WCn$6B}I z%SRY{?!EPAE_7+T_WH#SFTb^Z?&A9SpSEb1hw4ddQV@T8s_wHyA8JGa;_CLzU>ku} znzjq1d6x`9D^?7U7=!?G?HNxdeaCfXgk8Iu-hbfW#G%9Yp1kjV8CUuxU8bfQzU4QN z1t^;NfS?${(~q_&@d#f+sUd&G7SygI2hzj{#3i=4kJtzzhLD63L2_Tw+Z~~_m~MA~ ztf1@CVBwC3+XT#gil5>Z>h~sy4g5S3^7F`s4Mr{yAw9b2k@Lv5o=CMBQVMMp`Ru$- z8Q7Cp-Uxj$F$@zy@o!S?dqf4jv`;c|Jql7sXPi1{D2_isJsS}Exoz6w{tbwz0an$f z+au2Wub$wD2RCCVY1po;tM3kN$exBpTu1x_L3-(_%=j`+9qM$vJq?P zJ|!l|;1G#NNZbR_3)(~CC{=&4TOE@5o?e^%;t1K?OM(*C9{BF~{F&ViYmeLphK&&* zI0Ui5@}HU}=}EvCdLqPf0a$*=sI#4)h$ryShBZU`gECnK&1{147|(K?Zg}6a4y(dvSQ&(BKFEjo7CyXe@T~|68+#+* z$B{aZ0Qi>y@zKCU>Kv!knWJaeZzE)Oai0M06F_E?-`q|G$smP_7L}%2%Zt{$M~&Xi+9O_fG*xA3(|nxTa})Skc%L}cPiXx_-*|5 zAfr;=Y=mC`xto9|k$|V+p*c6u&cWAokPFiAH6P?(V%~Q2O5Wz-?JjCj-rmvmw@}>}ZQ|R#~2XFH2^o;=lzU;jrBW2UU}`q_up81`T39E{*Be5nzawk!MwNT%B$~C z@Hltzqt|}j6D!0CSj%)%G{`tDjc(Oq&a3&&Qpcib$YcwtoJs(bWR0UHN3dyFszaF~b8EKY`O>J?L4<2T zC47q(g|{oqFRnemy7uauYaje@?eb-5%vNV+#n;G13NPa8BoxOKc_yj&O%mUNK(c;- zA`Ac;vK7iTww6>LTxqoG{#=V zYQdsRffj~$0)q105k~)=tQUE27q-{*>;~}W6?k7_w|^q)=%jd)Om@x?D)h)vD?Fu? z!mx;?SrX0}QT1^`=~hu*pz2iMl4kgb%y$kr^s7#_p@O2!Vdi|bAn7O}C?g0DO~c-k zuI*N4o!UalIqPG<@F@!BvJm~|QR235PuW<*^y7;9nMEd|w(wLo+_BvFlZXnuGBI5} zi<6lXeng{Yi+SwiXdpTcsPLnvqG#&WN63;J$)h#wP}ez*vw`M~LG09$h-C#FEa<}I Q>S>0<1~Z3o-JECt1sFALRR910 literal 0 HcmV?d00001 diff --git a/backend/tests/test_notify_service.py b/backend/tests/test_notify_service.py new file mode 100644 index 0000000..9f05e9d --- /dev/null +++ b/backend/tests/test_notify_service.py @@ -0,0 +1,106 @@ +import pytest +import pytest_asyncio +import uuid +from unittest.mock import Mock, patch +from sqlalchemy.ext.asyncio import AsyncSession +from backend.database import AsyncSessionLocal, init_db +from backend import crud +from backend.notify_service import NotifyService + +@pytest_asyncio.fixture +async def db(): + await init_db() + async with AsyncSessionLocal() as session: + yield session + await session.rollback() + +@pytest.mark.asyncio +async def test_send_to_discord(db): + """测试发送到 Discord""" + unique_name = f"测试Discord_{uuid.uuid4().hex[:8]}" + # 创建测试通道 + channel = await crud.create_channel(db, { + "name": unique_name, + "type": "discord", + "config": {"webhook_url": "https://discord.com/api/webhooks/test"} + }) + + service = NotifyService() + + with patch('apprise.Apprise.notify') as mock_notify: + mock_notify.return_value = True + result = await service.send_notification( + db, channel.id, "测试标题", "测试内容", "normal" + ) + + assert result['status'] == 'sent' + assert result['channel_id'] == channel.id + mock_notify.assert_called_once() + +@pytest.mark.asyncio +async def test_send_to_multiple_channels(db): + """测试批量发送到多个通道""" + # 创建多个通道 + name_a = f"通道A_{uuid.uuid4().hex[:8]}" + name_b = f"通道B_{uuid.uuid4().hex[:8]}" + await crud.create_channel(db, { + "name": name_a, + "type": "discord", + "config": {"webhook_url": "https://discord.com/webhook"}, + "tags": ["alerts"] + }) + await crud.create_channel(db, { + "name": name_b, + "type": "telegram", + "config": {"bot_token": "123456:ABC", "chat_id": "12345"}, + "tags": ["alerts"] + }) + + service = NotifyService() + + with patch('apprise.Apprise.notify') as mock_notify: + mock_notify.return_value = True + results = await service.send_to_channels( + db, [name_a, name_b], None, "测试", "内容", "normal" + ) + + assert len(results) == 2 + assert all(r['status'] == 'sent' for r in results) + +@pytest.mark.asyncio +async def test_send_by_tags(db): + """测试按标签发送""" + unique_tag = f"alerts_{uuid.uuid4().hex[:8]}" + name_prod = f"生产告警_{uuid.uuid4().hex[:8]}" + name_dev = f"开发告警_{uuid.uuid4().hex[:8]}" + name_info = f"普通通知_{uuid.uuid4().hex[:8]}" + + await crud.create_channel(db, { + "name": name_prod, + "type": "discord", + "config": {"webhook_url": "https://discord.com/webhook"}, + "tags": [unique_tag, "production"] + }) + await crud.create_channel(db, { + "name": name_dev, + "type": "telegram", + "config": {"bot_token": "123456:ABC", "chat_id": "12345"}, + "tags": [unique_tag, "dev"] + }) + await crud.create_channel(db, { + "name": name_info, + "type": "email", + "config": {"username": "test@test.com", "password": "pass", "to_email": "to@test.com", "smtp_host": "smtp.test.com"}, + "tags": ["info"] + }) + + service = NotifyService() + + with patch('apprise.Apprise.notify') as mock_notify: + mock_notify.return_value = True + results = await service.send_to_channels( + db, None, [unique_tag], "告警", "服务器异常", "high" + ) + + assert len(results) == 2 # 只有带唯一标签的通道 + assert mock_notify.call_count == 2