-
Notifications
You must be signed in to change notification settings - Fork 0
/
cornjob.py
45 lines (40 loc) · 1.41 KB
/
cornjob.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import schedule
import time
from functions.proxy_get import ProxyGet
from functions.proxy_check import ProxyCheck
from functions.db import ProxyDatabase
from concurrent.futures import ThreadPoolExecutor
db = ProxyDatabase()
db.create_table()
def check_proxy(proxy: str) -> None:
try:
check_obj = ProxyCheck()
result = check_obj.check_http("google.com", proxy, 30)
if result:
ip, port = proxy.split(":")
proxy_info = check_obj.get_proxy_info(proxy)
db.add_proxy(
ip=ip,
port=port,
protocol="http",
ssl=result["ssl"],
country=f"{proxy_info['flag']['emoji']} {proxy_info['country_code']}",
status=result["status"],
speed="{:.2f}".format(result["speed"]),
)
except Exception as e: pass
def fetch_proxies() -> None:
old_proxies = db.fetch_all_proxies()
db.delete_proxy()
global proxies_db
proxy_obj = ProxyGet("http")
new_proxies = proxy_obj.get_proxyscrape(10000, "all")
proxies = [item["ip"] + ":" + str(item["port"]) for item in old_proxies] + new_proxies
with ThreadPoolExecutor(max_workers=500) as executor:
executor.map(check_proxy, proxies)
def cronjob_checker() -> None:
schedule.every(5).minutes.do(fetch_proxies)
while True:
schedule.run_pending()
time.sleep(1)
cronjob_checker()