From ec118262ebb1dbbe75269697ed22de73c3f74274 Mon Sep 17 00:00:00 2001 From: gmemstr Date: Sat, 11 Mar 2017 13:59:04 -0800 Subject: [PATCH 1/3] Starting work on converting to asynchrounous functions Some things are incredibly broken. Basically rewriting a lot of the code to work asynchronously. SQL was rewritten for smaller functions, scanning is now asynchronous (should be anyways). Compacted Slackbot a little bit, also changes icon based on number of servers down (once I've fixed all the other bugs :D). --- src/Cache.py | 109 ----------------------------------------------- src/SQL.py | 73 +++++++++++++++++++++++++++++++ src/Scan.py | 55 ++++++++++++++++++++++++ src/Slackbot.py | 24 +++++------ src/Statuses.py | 62 --------------------------- src/Webserver.py | 8 ++-- 6 files changed, 142 insertions(+), 189 deletions(-) delete mode 100644 src/Cache.py create mode 100644 src/SQL.py create mode 100644 src/Scan.py delete mode 100644 src/Statuses.py diff --git a/src/Cache.py b/src/Cache.py deleted file mode 100644 index 57f7884..0000000 --- a/src/Cache.py +++ /dev/null @@ -1,109 +0,0 @@ -# Completely rewritten with SQL in mind -import MySQLdb -import time -from src.Config import Config - -class Handler: - def __init__(self): - self.config = Config() - sqluser = self.config.Get("sql_user") - sqlpass = self.config.Get("sql_pass") - self.sqltable = self.config.Get("sql_db") - - #sqluser = "root" - #sqlpass = "" - #sqldb = "platypus" - - self.db = MySQLdb.connect(user=sqluser,passwd=sqlpass,db="platypus") - self.c = self.db.cursor() - - def CheckConnection(self): - try: - self.db.ping() - except: - self.db = MySQLdb.connect(user=sqluser,passwd=sqlpass,db="platypus") - - def Get(self,server="all",offline="all"): - self.CheckConnection() - if server == "all": - self.c.execute("SELECT * FROM "+self.sqltable) - return self.c.fetchall() - if offline == "only": - self.c.execute("SELECT * FROM "+self.sqltable+" WHERE online=false") - return self.c.fetchall() - else: - self.c.execute("SELECT * FROM "+self.sqltable+" WHERE id="+str(server)) - return self.c.fetchall() - - def SetStatus(self,panel,online,cpu,memory,disk): - self.CheckConnection() - print("Setting...") - self.c.execute("SELECT * FROM "+self.sqltable+" WHERE id="+str(panel)) - wasup = self.c.fetchone()[4] - udtime = 0 - - if online == False and wasup == 1: - print("Set offline") - # Server just went offline - self.c.execute("UPDATE "+self.sqltable+" SET online=false, udtime=0 WHERE id="+str(panel)) - self.db.commit() - - if online == True and wasup == 1: - # Refresh stats (online) - print("Still online") - self.c.execute("UPDATE "+self.sqltable+" SET online=true, udtime="+ - str(udtime + self.config.Get("scan_interval"))+", cpu="+ - cpu + ", memory="+memory+",disk="+disk+" WHERE id="+str(panel)) - self.db.commit() - - if online == False and wasup == 0: - print("Still offine") - # Do nothing (offline) - self.c.execute("UPDATE "+self.sqltable+" SET online=false, udtime="+ - str(udtime - self.config.Get("scan_interval"))+ - " WHERE id="+str(panel)) - self.db.commit() - - - if online == True and wasup == 0: - print("Set online") - # Panel just went onlie - self.c.execute("UPDATE "+self.sqltable+" SET online=true, udtime=0, cpu="+ - cpu + ", memory="+memory+",disk="+disk+" WHERE id="+str(panel)) - self.db.commit() - - def RemoveServer(self, panel): - self.CheckConnection() - - self.c.execute("DELETE FROM "+self.sqltable+" WHERE id="+str(panel)) - self.db.commit() - return True - def CreateServer(self, panel, form): - self.CheckConnection() - - # Insert new server into database - self.c.execute("INSERT INTO "+self.sqltable+" (id,name,hostname,location) VALUES (%s,%s,%s,%s)", - (int(form['id']),form['name'],form['hostname'],form['location'])) - self.db.commit() - return True - def ModServer(self, panel, form): - self.CheckConnection() - - # Edit server - self.c.execute("UPDATE "+self.sqltable+" SET name=%s,hostname=%s WHERE id=%s", - (form['name'], form['hostname'], panel)) - self.db.commit() - return True - - def GetAsJson(self,server="all",offline="all"): - raw = self.Get(server,offline) - res = {} - for s in raw: - print(s) - res[s[0]] = {"name": s[1], - "online": s[4], - "location": s[3], - "cpu": s[6], - "memory":s[7], - "disk":s[8]} - return res \ No newline at end of file diff --git a/src/SQL.py b/src/SQL.py new file mode 100644 index 0000000..0702117 --- /dev/null +++ b/src/SQL.py @@ -0,0 +1,73 @@ +import asyncio +import MySQLdb +from src.Config import Config + +class Sql: + def __init__(self): + self.Config = Config() + + self.config = Config() + self.sqluser = self.config.Get("sql_user") + self.sqlpass = self.config.Get("sql_pass") + self.sqltable = self.config.Get("sql_db") + + self.db = MySQLdb.connect(user=self.sqluser,passwd=self.sqlpass,db="platypus") + self.c = self.db.cursor() + + def CheckConnection(self): + try: + self.db.ping() + except: + self.db = MySQLdb.connect(user=self.sqluser,passwd=self.sqlpass,db="platypus") + + def Get(self,filter=None,arg=None): + self.CheckConnection() + if filter == None: + self.c.execute("SELECT * FROM " + self.sqltable) + yield self.c.fetchall() + if filter == "one": + self.c.execute("SELECT * FROM " + self.sqltable + " WHERE id=%s", (arg)) + yield self.c.fetchall() + else: + raise ValueError('Invalid filter for SQL query') + + def Set(self,panel,online,cpu,memory,disk): + self.CheckConnection() + self.c.execute("UPDATE " + self.sqltable + " SET online=%s,cpu=%s,memory=%s,disk=%s WHERE id=%s", + (online, cpu, memory, disk, panel)) + + def RemoveServer(self, panel): + self.CheckConnection() + + self.c.execute("DELETE FROM "+self.sqltable+" WHERE id="+str(panel)) + self.db.commit() + return True + def CreateServer(self, panel, form): + self.CheckConnection() + + # Insert new server into database + self.c.execute("INSERT INTO " + self.sqltable + " (id,name,hostname,location) VALUES (%s,%s,%s,%s)", + (int(form['id']),form['name'],form['hostname'],form['location'])) + self.db.commit() + return True + def ModServer(self, panel, form): + self.CheckConnection() + + # Edit server + self.c.execute("UPDATE " + self.sqltable + " SET name=%s,hostname=%s WHERE id=%s", + (form['name'], form['hostname'], panel)) + self.db.commit() + return True + + def GetAsJson(self,panel): + raw = self.Get("one",panel) + res = {} + for s in raw: + print(s) + res[s[0]] = {"name": s[1], + "online": s[4], + "location": s[3], + "cpu": s[6], + "memory":s[7], + "disk":s[8]} + return res \ No newline at end of file diff --git a/src/Scan.py b/src/Scan.py new file mode 100644 index 0000000..9790008 --- /dev/null +++ b/src/Scan.py @@ -0,0 +1,55 @@ +import asyncio +import requests +# from src.Cache import Handler +from src.SQL import Sql + +sql = Sql() + +class Scan: + def __init__(self): + self.panels = sql.Get() + + async def Fetch(self,panel=None): + if panel == None: + for p in self.panels: + result = await self.Check(p) + sql.Set(p[0], + result['online'], + str(result['cpu']), + str(result['memory']), + str(result['disk'])) + + async def Check(self, panel): + id = panel[0] + + try: + request = requests.get("http://" + panel[2] + "/platy/", timeout=2) + + print(panel[0], "online") + if request.status_code == 404: + return {"name": panel[1], + "online": True, + "cpu": 0, + "memory": 0, + "disk": 0} + else: + data = request.json() + return {"name": panel[1], + "online": True, + "cpu": data["cpu"], + "memory": data["memory"], + "disk": data["disk"]} + + except Exception as e: + print(panel[0], "offline") + print(e) + return {"name": panel[1], + "online": False, + "cpu": 0, + "memory": 0, + "disk": 0} + +s = Scan() +loop = asyncio.get_event_loop() +loop.create_task(s.Fetch()) +loop.run_forever() \ No newline at end of file diff --git a/src/Slackbot.py b/src/Slackbot.py index 67ea659..82993b9 100644 --- a/src/Slackbot.py +++ b/src/Slackbot.py @@ -1,5 +1,5 @@ from slackclient import SlackClient -from src.Cache import Handler +from src.SQL import Sql from src.Config import Config import threading @@ -7,7 +7,7 @@ channel = config.Get("slack_channel") token = config.Get("slack_api_key") sc = SlackClient(token) -handler = Handler() +sql = Sql() class Bot: def Post(self,message, channel, username, icon): @@ -16,14 +16,15 @@ def Post(self,message, channel, username, icon): username=username, icon_emoji=icon) - def BuildMessage(self,data): + def ServerReport(self,data): + servers = sql.Get() post = True off = 0 channel = config.Get("slack_channel") - username = "Platypus" icon = ":desktop_computer:" message = "Some panels may be offline!" - for s in data: + + for s in servers: if s[4] == 0: message = message + " " + s[1] + " (" + s[2] + ")" off = off + 1 @@ -31,13 +32,8 @@ def BuildMessage(self,data): if off > 1: post = True else: post = False - if post is True: self.Post(message, channel, username, icon) - + if off >= 2: icon = ":exclamation:" + if off >= 4: icon = ":fire:" - def Data(self): - data = handler.Get() - self.BuildMessage(data) - - def Loop(self): - self.Data() - threading.Timer(config.Get("slack_interval"), self.Loop).start() + username = "Platypus (" + str(off) + ")" + if post is True: self.Post(message, channel, username, icon) diff --git a/src/Statuses.py b/src/Statuses.py deleted file mode 100644 index bb0e90d..0000000 --- a/src/Statuses.py +++ /dev/null @@ -1,62 +0,0 @@ -import requests -import json -from src.Cache import Handler -import threading -from src.Config import Config - -config = Config() -handler = Handler() - -class Scanning: - - def Fetch(self,panel="all"): - if panel == "all": - s_list = handler.Get() - for s in s_list: - res = self.Scan(s) - handler.SetStatus(s[0],res['online'],res['cpu'],res['memory'],res['disk']) - return "done" - else: - s = handler.Get(panel) - return self.Scan(s[0]) - - def Scan(self, panel): - # print(panel) - id = panel[0] - res = {} - online = False - # Iterate through the list of servers - try: - # Attempts to fetch platy stats from panel. - request = requests.get("http://" + panel[2] + config.Get("stats_path"), - timeout=config.Get("scan_timeout")) - print(panel[0], "online") - online = True - if request.status_code == 404: - cpu = 0 # CPU - memory = 0 # RAM - disk = 0 # Disk - else: - data = request.json() - cpu = data["cpu"] # CPU - memory = data["memory"] # RAM - disk = data["hdd"] # Disk - except Exception as e: - print(panel[1] + " - offline") - cpu=0 - disk=0 - memory=0 - online = False - - res = {"name": panel[1], - "online": online, - "location": panel[3], - "cpu": str(cpu), - "memory":str(memory), - "disk":str(disk)} - - return res - - def Loop(self): - self.Fetch() - threading.Timer(config.Get("scan_interval"), self.Loop).start() \ No newline at end of file diff --git a/src/Webserver.py b/src/Webserver.py index fb97917..2ffc2eb 100644 --- a/src/Webserver.py +++ b/src/Webserver.py @@ -4,15 +4,15 @@ # Custom imports from src.Login import LoginManager, User -from src.Cache import Handler +from src.SQL import Sql from src.Config import Config -from src.Statuses import Scanning +from src.Scan import Scan lm = LoginManager() user = User() config = Config() -handler = Handler() -scan = Scanning() +handler = Sql() +scan = Scan() app = Flask(__name__) From 577352d4089ef390739c0266badb2c73f1d7831a Mon Sep 17 00:00:00 2001 From: gmemstr Date: Mon, 13 Mar 2017 12:54:22 -0700 Subject: [PATCH 2/3] Removed async and implemented threading instead. Decided to nix async (for now) because I have no clue how to properly implement it. Instead decided to use threading, which should work just fine (benchmarks TBD). Also fixed SQL.py not commiting to the database (derp). --- App.py | 24 +++++++++++------------- src/SQL.py | 5 +++-- src/Scan.py | 37 +++++++++++++++++++++---------------- src/Slackbot.py | 5 +++++ 4 files changed, 40 insertions(+), 31 deletions(-) diff --git a/App.py b/App.py index b3e8f5d..690ddd4 100644 --- a/App.py +++ b/App.py @@ -1,12 +1,12 @@ -from multiprocessing import Process +from threading import Thread from src.Webserver import Webserver from src.Slackbot import Bot from src.Config import Config -from src.Statuses import Scanning - +from src.Scan import Scan + config = Config() ws = Webserver() -scn = Scanning() +scn = Scan() sb = Bot() if __name__ == "__main__": @@ -22,22 +22,20 @@ # The order in which the processes are started # is important, since the flask process effectively # blocks the rest of the code from running (wtfkwbtihiw) - scnp = Process(target=scn.Loop()).start() - sbp = Process(target=sb.Loop()).start() - wsp = Process(target=ws.Run()).start() + scnp = Thread(target=scn.Fetch().start()) + sbp = Thread(target=sb.Loop()).start() + wsp = Thread(target=ws.Run()).start() elif config.Get("enable_webserver") is True: print("Webserver starting up") - scnp = Process(target=scn.Loop()).start() - wsp = Process(target=ws.Run()).start() - + scnp = Thread(target=scn.Loop()).start() + wsp = Thread(target=ws.Run()).start() elif config.Get("enable_slackbot") is True: print("Slackbot enabled") - scnp = Process(target=scn.Loop()).start() - sbp = Process(target=sb.Loop()).start() + scnp = Thread(target=scn.Loop()).start() + sbp = Thread(target=sb.Loop()).start() else: print("No frontends enabed. Scanning to cache only.") scn.Loop() - diff --git a/src/SQL.py b/src/SQL.py index 0702117..946595e 100644 --- a/src/SQL.py +++ b/src/SQL.py @@ -24,10 +24,10 @@ def Get(self,filter=None,arg=None): self.CheckConnection() if filter == None: self.c.execute("SELECT * FROM " + self.sqltable) - yield self.c.fetchall() + return self.c.fetchall() if filter == "one": self.c.execute("SELECT * FROM " + self.sqltable + " WHERE id=%s", (arg)) - yield self.c.fetchall() + return self.c.fetchall() else: raise ValueError('Invalid filter for SQL query') @@ -35,6 +35,7 @@ def Set(self,panel,online,cpu,memory,disk): self.CheckConnection() self.c.execute("UPDATE " + self.sqltable + " SET online=%s,cpu=%s,memory=%s,disk=%s WHERE id=%s", (online, cpu, memory, disk, panel)) + self.db.commit() def RemoveServer(self, panel): self.CheckConnection() diff --git a/src/Scan.py b/src/Scan.py index 9790008..e11ecc4 100644 --- a/src/Scan.py +++ b/src/Scan.py @@ -1,31 +1,37 @@ -import asyncio import requests +import threading # from src.Cache import Handler from src.SQL import Sql +import time +from src.Config import Config +config = Config() sql = Sql() class Scan: def __init__(self): self.panels = sql.Get() - async def Fetch(self,panel=None): + def Fetch(self,panel=None): + print("Fetching panels") if panel == None: for p in self.panels: - result = await self.Check(p) + result = self.Check(p) + print(result) sql.Set(p[0], - result['online'], - str(result['cpu']), - str(result['memory']), - str(result['disk'])) - - async def Check(self, panel): + result['online'], + str(result['cpu']), + str(result['memory']), + str(result['disk'])) + + def Check(self, panel): id = panel[0] try: - request = requests.get("http://" + panel[2] + "/platy/", timeout=2) + request = requests.get("http://" + panel[2] + config.Get("stats_path"), + timeout = config.Get("scan_timeout")) - print(panel[0], "online") + print(panel[0], "online", request.status_code) if request.status_code == 404: return {"name": panel[1], "online": True, @@ -38,7 +44,7 @@ async def Check(self, panel): "online": True, "cpu": data["cpu"], "memory": data["memory"], - "disk": data["disk"]} + "disk": data["hdd"]} except Exception as e: print(panel[0], "offline") @@ -49,7 +55,6 @@ async def Check(self, panel): "memory": 0, "disk": 0} -s = Scan() -loop = asyncio.get_event_loop() -loop.create_task(s.Fetch()) -loop.run_forever() \ No newline at end of file + def Loop(self): + self.Fetch() + threading.Timer(config.Get("scan_interval"), self.Loop).start() diff --git a/src/Slackbot.py b/src/Slackbot.py index 82993b9..e0843e8 100644 --- a/src/Slackbot.py +++ b/src/Slackbot.py @@ -37,3 +37,8 @@ def ServerReport(self,data): username = "Platypus (" + str(off) + ")" if post is True: self.Post(message, channel, username, icon) + + + def Loop(self): + self.Fetch() + threading.Timer(config.Get("slack_interval"), self.Loop).start() From 2f9b38edc8a746fa93a99dd07cfe60fb8e1b6584 Mon Sep 17 00:00:00 2001 From: gmemstr Date: Mon, 13 Mar 2017 13:40:20 -0700 Subject: [PATCH 3/3] Actually implemented individual panel fetching --- App.py | 2 +- src/SQL.py | 4 ++-- src/Scan.py | 5 +++++ src/Webserver.py | 7 +++++-- 4 files changed, 13 insertions(+), 5 deletions(-) diff --git a/App.py b/App.py index 690ddd4..910324d 100644 --- a/App.py +++ b/App.py @@ -28,7 +28,7 @@ elif config.Get("enable_webserver") is True: print("Webserver starting up") - scnp = Thread(target=scn.Loop()).start() + #scnp = Thread(target=scn.Loop()).start() wsp = Thread(target=ws.Run()).start() elif config.Get("enable_slackbot") is True: diff --git a/src/SQL.py b/src/SQL.py index 946595e..ba41751 100644 --- a/src/SQL.py +++ b/src/SQL.py @@ -20,13 +20,13 @@ def CheckConnection(self): except: self.db = MySQLdb.connect(user=self.sqluser,passwd=self.sqlpass,db="platypus") - def Get(self,filter=None,arg=None): + def Get(self,filter=None,panel=None): self.CheckConnection() if filter == None: self.c.execute("SELECT * FROM " + self.sqltable) return self.c.fetchall() if filter == "one": - self.c.execute("SELECT * FROM " + self.sqltable + " WHERE id=%s", (arg)) + self.c.execute("SELECT * FROM " + self.sqltable + " WHERE id="+panel) return self.c.fetchall() else: raise ValueError('Invalid filter for SQL query') diff --git a/src/Scan.py b/src/Scan.py index e11ecc4..610a70c 100644 --- a/src/Scan.py +++ b/src/Scan.py @@ -24,6 +24,11 @@ def Fetch(self,panel=None): str(result['memory']), str(result['disk'])) + else: + panel = sql.Get("one", panel)[0] + print(panel) + return self.Check(panel) + def Check(self, panel): id = panel[0] diff --git a/src/Webserver.py b/src/Webserver.py index 2ffc2eb..0a41fff 100644 --- a/src/Webserver.py +++ b/src/Webserver.py @@ -1,6 +1,7 @@ # Python modules from flask import Flask, render_template, abort, g, request, redirect, url_for, jsonify import requests +from multiprocessing.pool import ThreadPool # Custom imports from src.Login import LoginManager, User @@ -28,9 +29,11 @@ def ReturnRawStats(): @app.route('/fetch/') def MiddlemanStat(panel): + pool = ThreadPool(processes=1) + # Acts as a middleman for CORS reasons - res = scan.Fetch(panel) - return jsonify(res) + async_result = pool.apply_async(scan.Fetch, (panel,)) + return jsonify(async_result.get()) @app.route("/login", methods=["GET", "POST"]) def LoginRoute():