Skip to content

Commit

Permalink
Merge pull request #81 from zurdi15/refactor/models_codebase
Browse files Browse the repository at this point in the history
Refactor/models codebase
  • Loading branch information
zurdi15 committed Mar 29, 2023
2 parents 6c3a631 + 88029e5 commit d103fd6
Show file tree
Hide file tree
Showing 10 changed files with 214 additions and 177 deletions.
7 changes: 7 additions & 0 deletions backend/src/handler/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from handler.db_handler import DBHandler
from handler.igdb_handler import IGDBHandler
from handler.sgdb_handler import SGDBHandler

igdbh: IGDBHandler = IGDBHandler()
sgdbh: SGDBHandler = SGDBHandler()
dbh: DBHandler = DBHandler()
40 changes: 12 additions & 28 deletions backend/src/handler/db_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,69 +23,53 @@ def wrapper(*args):
return wrapper


def add_platform(self, **kargs) -> None:
with Session.begin() as session:
session.merge(Platform(**kargs))


def add_platform(self, **kargs) -> None:
with Session.begin() as session:
session.merge(Platform(**kargs))


def get_platforms(self) -> list[Platform]:
def add_platform(self, Platform: Platform) -> None:
try:
with Session.begin() as session:
return session.scalars(select(Platform).order_by(Platform.slug.asc())).all()
session.merge(Platform)
except ProgrammingError as e:
raise HTTPException(status_code=404, detail=f"Platforms table not found: {e}")


def add_platform(self, **kargs) -> None:
def get_platforms(self) -> list[Platform]:
try:
with Session.begin() as session:
session.merge(Platform(**kargs))
return session.scalars(select(Platform).order_by(Platform.slug.asc())).all()
except ProgrammingError as e:
raise HTTPException(status_code=404, detail=f"Roms table not found: {e}")
raise HTTPException(status_code=404, detail=f"Platforms table not found: {e}")

def add_rom(self, rom: Rom) -> None:
with Session.begin() as session:
session.merge(rom)

def get_roms(self, p_slug: str) -> list[Rom]:
with Session.begin() as session:
return session.scalars(select(Rom).filter_by(p_slug=p_slug).order_by(Rom.filename.asc())).all()


def get_rom(self, p_slug: str, filename: str) -> Rom:
with Session.begin() as session:
return session.scalars(select(Rom).filter_by(p_slug=p_slug, filename=filename)).first()


def add_rom(self, **kargs) -> None:
with Session.begin() as session:
session.merge(Rom(**kargs))


def update_rom(self, p_slug: str, filename: str, data: dict) -> None:
with Session.begin() as session:
session.query(Rom) \
.filter(Rom.p_slug==p_slug, Rom.filename==filename) \
.update(data, synchronize_session='evaluate')


def delete_rom(self, p_slug: str, filename: str) -> None:
with Session.begin() as session:
session.query(Rom) \
.filter(Rom.p_slug==p_slug, Rom.filename==filename) \
.delete(synchronize_session='evaluate')


def purge_platforms(self, platforms: list) -> None:
def purge_platforms(self, platforms: list[str]) -> None:
log.info("Purging platforms")
with Session.begin() as session:
session.query(Platform) \
.filter(Platform.slug.not_in(platforms)) \
.delete(synchronize_session='evaluate')


def purge_roms(self, p_slug: str, roms: list) -> None:
def purge_roms(self, p_slug: str, roms: list[dict]) -> None:
log.info(f"Purging {p_slug} roms")
with Session.begin() as session:
session.query(Rom) \
.filter(Rom.p_slug==p_slug, Rom.filename.not_in([rom['filename'] for rom in roms])) \
Expand Down
27 changes: 13 additions & 14 deletions backend/src/handler/igdb_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,31 +32,30 @@ def wrapper(*args):
def get_platform_details(self, slug: str) -> tuple:
igdb_id: str = ""
name: str = ""
url_logo: str = ""
try:
res_details: dict = requests.post("https://api.igdb.com/v4/platforms/", headers=self.headers,
data=f"fields id, name, platform_logo; where slug=\"{slug}\";").json()[0]
data=f"fields id, name; where slug=\"{slug}\";").json()[0]
igdb_id = res_details['id']
name = res_details['name']
except IndexError:
log.warning("platform not found in igdb")
if not name: name = slug
return igdb_id, name, url_logo
return {'igdb_id': igdb_id, 'name': name}


@check_twitch_token
def get_rom_details(self, filename: str, p_igdb_id: int, r_igdb_id: str) -> dict:
def get_rom_details(self, filename: str, p_igdb_id: int, r_igdb_id_search: str) -> dict:
filename_no_ext: str = filename.split('.')[0]
igdb_id: str = ""
r_igdb_id: str = ""
slug: str = ""
name: str = ""
summary: str = ""
url_cover: str = ""

if r_igdb_id:
if r_igdb_id_search:
res_details: dict = requests.post("https://api.igdb.com/v4/games/", headers=self.headers,
data=f"fields id, slug, name, summary; where id={r_igdb_id};").json()[0]
igdb_id = res_details['id']
data=f"fields id, slug, name, summary; where id={r_igdb_id_search};").json()[0]
r_igdb_id = res_details['id']
slug = res_details['slug']
name = res_details['name']
try:
Expand All @@ -71,7 +70,7 @@ def get_rom_details(self, filename: str, p_igdb_id: int, r_igdb_id: str) -> dict

res_details: dict = requests.post("https://api.igdb.com/v4/games/", headers=self.headers,
data=f"search \"{search_term}\";fields id, slug, name, summary; where platforms=[{p_igdb_id}] & category=0;").json()[0]
igdb_id = res_details['id']
r_igdb_id = res_details['id']
slug = res_details['slug']
name = res_details['name']
try:
Expand All @@ -82,7 +81,7 @@ def get_rom_details(self, filename: str, p_igdb_id: int, r_igdb_id: str) -> dict
try:
res_details: dict = requests.post("https://api.igdb.com/v4/games/", headers=self.headers,
data=f"search \"{search_term}\";fields name, id, slug, summary; where platforms=[{p_igdb_id}] & category=10;").json()[0]
igdb_id = res_details['id']
r_igdb_id = res_details['id']
slug = res_details['slug']
name = res_details['name']
try:
Expand All @@ -93,7 +92,7 @@ def get_rom_details(self, filename: str, p_igdb_id: int, r_igdb_id: str) -> dict
try:
res_details: dict = requests.post("https://api.igdb.com/v4/games/", headers=self.headers,
data=f"search \"{search_term}\";fields name, id, slug, summary; where platforms=[{p_igdb_id}];").json()[0]
igdb_id = res_details['id']
r_igdb_id = res_details['id']
slug = res_details['slug']
name = res_details['name']
try:
Expand All @@ -102,15 +101,15 @@ def get_rom_details(self, filename: str, p_igdb_id: int, r_igdb_id: str) -> dict
pass
except IndexError:
log.warning(f"{filename} rom not found in igdb")
if igdb_id:
if r_igdb_id:
try:
res_details: dict = requests.post("https://api.igdb.com/v4/covers/", headers=self.headers,
data=f"fields url; where game={igdb_id};").json()[0]
data=f"fields url; where game={r_igdb_id};").json()[0]
url_cover: str = f"https:{res_details['url']}"
except IndexError:
log.warning(f"{name} cover not found in igdb")
if not name: name = filename_no_ext
return (igdb_id, filename_no_ext, slug, name, summary, url_cover)
return (r_igdb_id, filename_no_ext, slug, name, summary, url_cover)


@check_twitch_token
Expand Down
36 changes: 16 additions & 20 deletions backend/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,18 @@
import uvicorn

from logger.logger import log
from handler.igdb_handler import IGDBHandler
from handler.sgdb_handler import SGDBHandler
from handler.db_handler import DBHandler
from handler import igdbh, dbh
from config.config import DEV_PORT, DEV_HOST
from models.platform import Platform
from utils import fs, fastapi


app = FastAPI()
fastapi.allow_cors(app)

igdbh: IGDBHandler = IGDBHandler()
sgdbh: SGDBHandler = SGDBHandler()
dbh: DBHandler = DBHandler()


@app.patch("/platforms/{p_slug}/roms/{filename}")
async def updateRom(req: Request, p_slug: str, filename: str):
async def updateRom(req: Request, p_slug: str, filename: str) -> dict:
"""Updates rom details"""

data: dict = await req.json()
Expand All @@ -42,7 +37,7 @@ async def updateRom(req: Request, p_slug: str, filename: str):


@app.delete("/platforms/{p_slug}/roms/{filename}")
async def delete_rom(p_slug: str, filename: str, filesystem: bool=False):
async def delete_rom(p_slug: str, filename: str, filesystem: bool=False) -> dict:
"""Detele rom from filesystem and database"""

log.info("deleting rom...")
Expand All @@ -52,45 +47,46 @@ async def delete_rom(p_slug: str, filename: str, filesystem: bool=False):


@app.get("/platforms/{p_slug}/roms/{filename}")
async def rom(p_slug: str, filename: str):
async def rom(p_slug: str, filename: str) -> dict:
"""Returns one rom data of the desired platform"""

return {'data': dbh.get_rom(p_slug, filename)}


@app.get("/platforms/{p_slug}/roms")
async def roms(p_slug: str):
async def roms(p_slug: str) -> dict:
"""Returns all roms of the desired platform"""

return {'data': dbh.get_roms(p_slug)}


@app.get("/platforms")
async def platforms():
async def platforms() -> dict:
"""Returns platforms data"""

return {'data': dbh.get_platforms()}


@app.put("/scan")
async def scan(req: Request, overwrite: bool=False):
async def scan(req: Request, overwrite: bool=False) -> dict:
"""Scan platforms and roms and write them in database."""

log.info("complete scaning...")
fs.store_default_resources(overwrite)
data: dict = await req.json()
platforms = data['platforms'] if data['platforms'] else fs.get_platforms()
platforms: list[str] = data['platforms'] if data['platforms'] else fs.get_platforms()
for p_slug in platforms:
p_igdb_id: str = fastapi.scan_platform(overwrite, p_slug, igdbh, dbh)
for rom in fs.get_roms(p_slug):
fastapi.scan_rom(overwrite, rom, p_igdb_id, p_slug, igdbh, dbh)
fastapi.purge(dbh, p_slug=p_slug)
fastapi.purge(dbh)
platform: Platform = fastapi.scan_platform(p_slug)
roms: list[dict] = fs.get_roms(p_slug)
for rom in roms:
fastapi.scan_rom(platform, rom)
dbh.purge_roms(p_slug, roms)
dbh.purge_platforms(fs.get_platforms())
return {'msg': 'success'}


@app.put("/search/roms/igdb")
async def search_rom_igdb(req: Request):
async def search_rom_igdb(req: Request) -> dict:
"""Get all the roms matched from igdb."""

data: dict = await req.json()
Expand Down
75 changes: 35 additions & 40 deletions backend/src/utils/fastapi.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

from handler.db_handler import DBHandler
from handler.igdb_handler import IGDBHandler
from handler import igdbh, dbh
from utils import fs
from models.platform import Platform
from models.rom import Rom
from logger.logger import log


Expand All @@ -18,47 +19,41 @@ def allow_cors(app: FastAPI) -> None:
log.info("CORS enabled")


def scan_platform(overwrite: bool, p_slug: str, igdbh: IGDBHandler, dbh: DBHandler) -> str:
platform: dict = {}
def scan_platform(p_slug: str) -> Platform:
"""Get platform details from IGDB if possible
Args:
p_slug: short name of the platform
Returns
Platform object
"""
log.info(f"Getting {p_slug} details")
p_igdb_id, p_name, url_logo = igdbh.get_platform_details(p_slug)
platform['slug'] = p_slug
platform['igdb_id'] = p_igdb_id
platform['name'] = p_name
#TODO: refactor logo details logic
if (overwrite or not fs.p_logo_exists(p_slug)) and url_logo:
fs.store_p_logo(p_slug, url_logo)
if fs.p_logo_exists(p_slug):
platform['path_logo'] = fs.get_p_path_logo(p_slug)
platform['n_roms'] = len(fs.get_roms(p_slug))
dbh.add_platform(**platform)
return p_igdb_id
platform_attrs: dict = igdbh.get_platform_details(p_slug)
platform_attrs['slug'] = p_slug
platform_attrs['path_logo'] = ''
platform_attrs['n_roms'] = fs.get_roms(p_slug, only_amount=True)
platform = Platform(**platform_attrs)
dbh.add_platform(platform)
return platform


def scan_rom(overwrite: bool, rom, p_igdb_id: str, p_slug: str, igdbh: IGDBHandler, dbh: DBHandler, r_igbd_id: str = '') -> None:
def scan_rom(platform: Platform, rom: dict, r_igbd_id_search: str = '', overwrite: bool = False) -> None:
log.info(f"Getting {rom['filename']} details")
r_igdb_id, filename_no_ext, r_slug, r_name, summary, url_cover = igdbh.get_rom_details(rom['filename'], p_igdb_id, r_igbd_id)
path_cover_s, path_cover_l, has_cover = fs.get_cover_details(overwrite, p_slug, filename_no_ext, url_cover)
rom: dict = {
'filename': rom['filename'], 'filename_no_ext': filename_no_ext, 'size': rom['size'],
'r_igdb_id': r_igdb_id, 'p_igdb_id': p_igdb_id,
'name': r_name, 'r_slug': r_slug, 'p_slug': p_slug,
r_igdb_id, filename_no_ext, r_slug, r_name, summary, url_cover = igdbh.get_rom_details(rom['filename'], platform.igdb_id, r_igbd_id_search)
path_cover_s, path_cover_l, has_cover = fs.get_cover_details(overwrite, platform.slug, filename_no_ext, url_cover)
rom_attrs: dict = {
'filename': rom['filename'],
'filename_no_ext': filename_no_ext,
'size': rom['size'],
'r_igdb_id': r_igdb_id,
'p_igdb_id': platform.igdb_id,
'name': r_name,
'r_slug': r_slug,
'p_slug': platform.slug,
'summary': summary,
'path_cover_s': path_cover_s, 'path_cover_l': path_cover_l, 'has_cover': has_cover
'path_cover_s': path_cover_s,
'path_cover_l': path_cover_l,
'has_cover': has_cover
}
dbh.add_rom(**rom)


def purge(dbh: DBHandler, p_slug: str = '') -> None:
"""Clean the database from non existent platforms or roms"""
if p_slug:
# Purge only roms in platform
log.info(f"Purge {p_slug}")
dbh.purge_roms(p_slug, fs.get_roms(p_slug))
else:
# Purge all platforms / delete non existent platforms and non existen roms
platforms: list = fs.get_platforms()
dbh.purge_platforms(platforms)
for p_slug in platforms:
log.info(f"Purge {p_slug}")
dbh.purge_roms(p_slug, fs.get_roms(p_slug))
rom = Rom(**rom_attrs)
dbh.add_rom(rom)
Loading

0 comments on commit d103fd6

Please sign in to comment.