Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PostgreSQL adapter #126

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cliboa/adapter/mysql.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import pymysql

from cliboa.adapter.rdbms import RdbmsSupport


class MysqlAdaptor(RdbmsSupport):

def get_connection(self, **kwargs):
# see https://pymysql.readthedocs.io/en/latest/modules/connections.html
kwargs["host"] = self._host
Expand Down
132 changes: 132 additions & 0 deletions cliboa/adapter/postgres.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import psycopg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
from psycopg2.extras import DictCursor
from cliboa.util.lisboa_log import LisboaLog

class PostgresAdapter(object):
"""
Adapter class of PostgreSQL
"""

def __init__(self):
self._logger = LisboaLog.get_logger(__name__)
self.__cur = None
self.__con = None

def connect(self,host,user,dbname,password):
"""
Get postgres connection
"""

self.__con = psycopg2.connect(host= host , user=user,dbname=dbname, password= password)
self.__con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
return self.__con


def close(self):
"""
Release sqlite connection
"""

if self.__cur:
self.__cur.close()

self.__cur = None

if self.__con:

self.__con.close()

def fetch(self, sql, row_factory=None):
"""
Get cursor after executed query of select-related
Args:
sql (str): SQL
Returns:
cursor: cursor

"""

if self.__cur:
self.__cur.close()
self.__cur = None

self.__cur = self.__con.cursor()

if row_factory:
self.__cur.row_factory = row_factory
self.__cur.execute(sql)
return self.__cur

def create_user_func(self, dict):
"""
Create function
"""

self.__con.create_function(dict["name"], dict["args"], dict["func"])

def execute(self, sql):
"""
Execute SQL
Args:
sql (str): SQL to execute
"""

self.__con.execute(sql)

def commit(self):
"""
Commit
"""
self.__con.commit()

def execute_many_insert(
self, tblname, column_def, insert_rows, is_replace_into=True
):
"""
Execute many INSERT INTO SQL
Args:
tblname: target table
column_def: table column definition
insert_rows(dict[]) rows to be inserted
is_replace_into: when using replace into: True,
when using insert into: False
"""
if not tblname or not insert_rows:
raise ValueError("Parameters are missing")

holders = "%s " * len(column_def)

insert_sql = "REPLACE INTO" if is_replace_into else "INSERT INTO"
numbers_of_columns = int(len(holders) / 3)
sql = insert_sql + " %s (%s) VALUES (%s)" % (
tblname,
# escape all the columns with double quotes
",".join('"' + column + '"' for column in column_def),
",".join(list(holders.split(' ',numbers_of_columns)[0:numbers_of_columns]))
)
self._logger.debug("sql: %s" % sql)
values = []
for row in insert_rows:
vs = []
for c in column_def:
vs.append(row.get(c))
values.append(vs)
if len(vs) != len(column_def):
raise ValueError(
"The length of insert rows must be equal to the column definition. Column definition: %s, Insert rows: %s" # noqa
% (column_def, vs)
)

self.__con.cursor().executemany(sql, values)

def add_index(self, tblname, columns):
"""
Create index
Args:
tblname (str): target table
columns (str[]): index list
"""
idx_name = tblname + "_" + "".join(columns)
sql = "CREATE INDEX %s ON %s(%s)" % (idx_name, tblname, ",".join(columns))
self.execute(sql)
15 changes: 11 additions & 4 deletions cliboa/adapter/rdbms.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from abc import abstractmethod

from cliboa.util.exception import DatabaseException
from cliboa.util.lisboa_log import LisboaLog


class RdbmsSupport():

class RdbmsSupport:

"""
This class allows you to access a database and
provides database transaction by context manager.
Expand Down Expand Up @@ -54,8 +57,12 @@ def __exit__(self, *exc):

def _begin(self):
self._con = self.get_connection()
self._logger.info('Connected to database(host=%s, user=%s, db=%s)' %
(self._host, self._user, self._dbname))

self._logger.info(
"Connected to database(host=%s, user=%s, db=%s)"
% (self._host, self._user, self._dbname)
)


def _commit(self):
if self._con:
Expand Down Expand Up @@ -116,4 +123,4 @@ def get_connection(self, **kwargs):
"""
Returns a database connection you want to access to
"""
pass

15 changes: 10 additions & 5 deletions cliboa/scenario/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
BigQueryReadCache,
FirestoreDownloadDocument,
GcsDownload,
GcsDownloadFileDelete
GcsDownloadFileDelete,
)
from .extract.http import HttpDownload
from .extract.mysql import MysqlRead
Expand All @@ -38,21 +38,26 @@
CsvReadBigQueryCreate,
FirestoreDocumentCreate,
GcsFileUpload,
GcsUpload
GcsUpload,
)
from .load.sftp import SftpFileLoad, SftpUpload
from .load.sqlite import CsvReadSqliteCreate, SqliteCreation, SqliteWrite
from .transform.csv import CsvColumnExtract
from .transform.file import (

from .transform.csv import (
CsvColumnExtract,

ColumnLengthAdjust,
CsvColsExtract,
CsvHeaderConvert,
CsvMerge,
CsvConcat,
)
from .transform.file import (
DateFormatConvert,
ExcelConvert,
FileCompress,
FileConvert,
FileDecompress,
FileDivide,
FileRename
FileRename,
)
4 changes: 3 additions & 1 deletion cliboa/scenario/extract/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
from cliboa.scenario.rdbms import BaseRdbmsRead

from cliboa.adapter.mysql import MysqlAdaptor
from cliboa.scenario.rdbms import BaseRdbmsRead



class MysqlRead(BaseRdbmsRead):
Expand Down
123 changes: 123 additions & 0 deletions cliboa/scenario/extract/postgres.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import psycopg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
import psycopg2.extras
import ast
kyoujuro marked this conversation as resolved.
Show resolved Hide resolved
import codecs
import csv
from cliboa.scenario.base import BasePostgres
from cliboa.scenario.validator import (EssentialParameters, IOOutput,
PostgresTableExistence)
from cliboa.util.exception import FileNotFound, PostgresInvalid


class PostgresRead(BasePostgres):

def __init__(self):
super().__init__()
"""
Implement attributes to be set in scenario.yml
"""
self._tblname = None
self._raw_query = None

def tblname(self, tblname):
self._tblname = tblname

def raw_query(self, raw_query):
self._raw_query = raw_query

def execute(self, *args):
super().execute()

"""
Implement processes which would like to do
"""
input_valid = IOInput(self._io)
input_valid()

param_valid = EssentialParameters(self.__class__.__name__, [self._tblname,self._password,self._host,self._dbname,self._password])
param_valid()

tbl_valid = PostgresTableExistence(self._host, self._user,
self._dbname, self._tblname, self._password)
tbl_valid()

def dict_factory(cursor, row):
d = {}
for i, col in enumerate(cursor.description):
d[col[0]] = row[i]
return d

def dict_f(row):
d = {}
for i, col in enumerate(colnames):
d[col[0]] = row[i]
print(row[i])
return d



conn = self._postgres_adptr.connect(self._host, self._user,
self._dbname, self._password)


cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)

cur.execute(self._get_query())
results = cur.fetchall()

dict_result = []
for row in results:
dict_result.append(dict(row))


for r in dict_result:
self._s.save(r)

kyoujuro marked this conversation as resolved.
Show resolved Hide resolved
def _get_query(self):
"""
Get sql to read
"""
if self._raw_query:
return self._raw_query
sql = ""
if self._columns:
select_columns = ",".join(map(str, self._columns))
sql = "SELECT %s FROM %s" % (select_columns, self._tblname)
else:
sql = "SELECT * FROM %s" % self._tblname
return sql

class PostgresReadRow(BasePostgres):
"""
Execute query.
"""

def __init__(self):
super().__init__()

def execute(self, *args):
super().execute()

self._postgres_adptr.connect(self._host, self._user, self._dbname, self._password)
try:
cur = self._postgres_adptr.fetch(
sql=self._get_query(), row_factory=self._get_factory()
)
self._callback_handler(cur)
finally:
self._postgres_adptr.close()

def _get_factory(self):
"""
Default row factory (returns value as tuple) is used if factory is not set
"""
return None

def _get_query(self):
raise NotImplementedError("Method 'get_query' must be implemented by subclass")

def _callback_handler(self, cursor):
raise NotImplementedError(
"Method 'callback_handler' must be implemented by subclass"
)
Loading