-
Notifications
You must be signed in to change notification settings - Fork 0
/
job_monitor.py
65 lines (54 loc) · 1.71 KB
/
job_monitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import time
from kubernetes import client, config, watch
import mysql.connector
from mysql.connector import Error
# attempt to load in-cluster config, or else it's a docker setup
try:
config.load_incluster_config()
except config.ConfigException:
config.load_kube_config()
# load batch API
batch_v1 = client.BatchV1Api()
w = watch.Watch()
def update_job_status_in_db(job_name, status):
try:
connection = mysql.connector.connect(
host='mariadb',
database='exampledb',
user='exampleuser',
password='examplepass'
)
if connection.is_connected():
cursor = connection.cursor()
cursor.execute("""
INSERT INTO job_status (job_name, status)
VALUES (%s, %s)
ON DUPLICATE KEY UPDATE status=%s
""", (job_name, status, status))
connection.commit()
except Error as e:
print("Error while connecting to MariaDB", e)
finally:
if connection.is_connected():
cursor.close()
connection.close()
def watch_jobs():
for event in w.stream(batch_v1.list_namespaced_job, namespace='default'):
job = event['object']
job_name = job.metadata.name
if job.status.succeeded is not None:
status = 'Complete'
elif job.status.failed is not None:
status = 'Failed'
else:
status = 'Running'
print(f"Job {job_name} status: {status}")
update_job_status_in_db(job_name, status)
def main():
while True:
try:
watch_jobs()
except Error as e:
print("Error in processing loop", e)
if __name__ == "__main__":
main()