-
Notifications
You must be signed in to change notification settings - Fork 3
/
tasks.py
48 lines (38 loc) · 1.48 KB
/
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import asyncio
from aioclock.group import Group
from aioclock import AioClock, Every, Once
from src.config import settings
from src.utils.logger import logger
from games.hamster.tapper import Tapper
from src.utils.scripts import getSessions
from src.telegram.multiClients import connectAndCacheClients
group = Group()
app = AioClock()
app.include_group(group)
@app.task(trigger=Once())
async def once():
logger.info('tasks on startup!')
await connectAndCacheClients(
'hamster_kombat_bot',
'https://hamsterkombatgame.io',
f"kentId{settings.OWNERS[0]}"
)
referal_tasks = [asyncio.create_task(Tapper(session).add_referral()) for session in getSessions()]
tasks_tasks = [asyncio.create_task(Tapper(session).run()) for session in getSessions()]
daily_tasks = [asyncio.create_task(Tapper(session).daily_events()) for session in getSessions()]
all_jobs = referal_tasks + tasks_tasks + daily_tasks
await asyncio.gather(*all_jobs)
@group.task(trigger=Every(minutes=60))
async def every():
logger.info("Cache on Every 60 minutes")
await connectAndCacheClients(
'hamster_kombat_bot',
'https://hamsterkombatgame.io',
f"kentId{settings.OWNERS[0]}"
)
@group.task(trigger=Every(minutes=120))
async def every():
logger.info("Daily on Every 120 minutes")
sessions = getSessions()
tasks = [asyncio.create_task(Tapper(session).daily_events()) for session in sessions]
await asyncio.gather(*tasks)