-
Notifications
You must be signed in to change notification settings - Fork 1
/
scheduler.py
63 lines (44 loc) · 1.83 KB
/
scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import time
import logging
import schedule
import ai.utils
from timeit import default_timer
from server.analyze import analyze
# Load models once and set analyze to run only once per hour.
en_nlp, el_nlp, lang_det = ai.utils.Models.load_models()
# The wrapper function that runs analyze for the first time only.
def run_first_time():
logging.info('ML Pipeline: First Run.')
analyze(en_nlp, el_nlp, lang_det, first_run = True)
return schedule.CancelJob
# The wrapper function which calls analyze.
def work():
analyze(en_nlp, el_nlp, lang_det)
def scheduler(schedule_interval_secs = 60*60):
# Run the analyze() for the first time and measure its processing time.
# Add a stalling amount of time, so Neo4j has time to start.
start = default_timer()
time.sleep(30)
schedule.every(0).seconds.do(run_first_time)
schedule.run_all()
end = default_timer()
# Initialize the processing time and the iteration tick.
processing_time, tick = round(end - start), 0
# Keep running the scheduler indefinitely, until the process is manually stopped.
# Every loop iteration adds a few msec of timeshift.
while True:
if tick + processing_time >= schedule_interval_secs:
# Measure the processing time.
start = default_timer()
# Schedule the job instantly, execute it and unschedule it.
job = schedule.every(0).seconds.do(work)
schedule.run_all()
schedule.cancel_job(job)
end = default_timer()
# Set the new processing time and reset the tick.
processing_time, tick = round(end - start), 0
# Set the loop iteration to 1 sec.
time.sleep(1)
# The tick represents one time unit of the iteration.
tick += 1
if __name__ == '__main__': scheduler()