1
1
from .utilities import logger
2
- from io import StringIO
3
- from pandas import DataFrame
4
- from typing import Callable , List
5
2
from sqlalchemy .schema import AddConstraint , DropConstraint
6
3
from sqlalchemy .exc import SQLAlchemyError
7
- from sqlalchemy .sql .schema import Table
8
- from sqlalchemy .engine .base import Connection
9
4
10
5
11
6
class BaseCopy (object ):
@@ -15,21 +10,26 @@ class BaseCopy(object):
15
10
16
11
def __init__ (
17
12
self ,
18
- defer_sql_objs : bool = False ,
19
- conn : Connection = None ,
20
- table_obj : Table = None ,
21
- sql_table : str = None ,
22
- csv_chunksize : int = 10 ** 6 ,
13
+ defer_sql_objs = False ,
14
+ conn = None ,
15
+ table_obj = None ,
16
+ sql_table = None ,
17
+ csv_chunksize = 10 ** 6 ,
23
18
):
24
19
"""
25
20
Parameters
26
21
----------
27
- defer_sql_objs: multiprocessing has issue with passing SQLALchemy objects, so if
22
+ defer_sql_objs: bool
23
+ multiprocessing has issue with passing SQLALchemy objects, so if
28
24
True, defer attributing these to the object until after pickled by Pool
29
- conn: SQLAlchemy connection managed outside of the object
30
- table_obj: SQLAlchemy object for the destination SQL Table
31
- sql_table: string of SQL table name
32
- csv_chunksize: max rows to keep in memory when generating CSV for COPY
25
+ conn: SQLAlchemy Connection
26
+ Managed outside of the object
27
+ table_obj: SQLAlchemy Table
28
+ Model object for the destination SQL Table
29
+ sql_table: string
30
+ SQL table name
31
+ csv_chunksize: int
32
+ Max rows to keep in memory when generating CSV for COPY
33
33
"""
34
34
35
35
self .rows = 0
@@ -47,8 +47,10 @@ def instantiate_sql_objs(self, conn, table_obj):
47
47
48
48
Parameters
49
49
----------
50
- conn: SQLAlchemy connection managed outside of the object
51
- table_obj: SQLAlchemy object for the destination SQL Table
50
+ conn: SQLAlchemy Connection
51
+ Managed outside of the object
52
+ table_obj: SQLAlchemy Table
53
+ Model object for the destination SQL Table
52
54
"""
53
55
self .conn = conn
54
56
self .table_obj = table_obj
@@ -61,71 +63,77 @@ def drop_pk(self):
61
63
Drop primary key constraints on PostgreSQL table as well as CASCADE any other
62
64
constraints that may rely on the PK
63
65
"""
64
- logger .info (f "Dropping { self . sql_table } primary key" )
66
+ logger .info ("Dropping {} primary key" . format ( self . sql_table ) )
65
67
try :
66
68
with self .conn .begin_nested ():
67
69
self .conn .execute (DropConstraint (self .primary_key , cascade = True ))
68
70
except SQLAlchemyError :
69
- logger .info (f" { self . sql_table } primary key not found. Skipping" )
71
+ logger .info ("{ } primary key not found. Skipping". format ( self . sql_table ) )
70
72
71
73
def create_pk (self ):
72
74
"""Create primary key constraints on PostgreSQL table"""
73
- logger .info (f "Creating { self . sql_table } primary key" )
75
+ logger .info ("Creating {} primary key" . format ( self . sql_table ) )
74
76
self .conn .execute (AddConstraint (self .primary_key ))
75
77
76
78
def drop_fks (self ):
77
79
"""Drop foreign key constraints on PostgreSQL table"""
78
80
for fk in self .foreign_keys :
79
- logger .info (f "Dropping foreign key { fk .name } " )
81
+ logger .info ("Dropping foreign key {}" . format ( fk .name ) )
80
82
try :
81
83
with self .conn .begin_nested ():
82
84
self .conn .execute (DropConstraint (fk ))
83
85
except SQLAlchemyError :
84
- logger .warn (f "Foreign key { fk . name } not found" )
86
+ logger .warn ("Foreign key {} not found" . format ( fk . name ) )
85
87
86
88
def create_fks (self ):
87
89
"""Create foreign key constraints on PostgreSQL table"""
88
90
for fk in self .foreign_keys :
89
91
try :
90
- logger .info (f "Creating foreign key { fk .name } " )
92
+ logger .info ("Creating foreign key {fk.name}" . format ( fk . name ) )
91
93
self .conn .execute (AddConstraint (fk ))
92
94
except SQLAlchemyError :
93
- logger .warn (f "Error creating foreign key { fk .name } " )
95
+ logger .warn ("Error creating foreign key {fk.name}" . format ( fk . name ) )
94
96
95
97
def truncate (self ):
96
98
"""TRUNCATE PostgreSQL table"""
97
- logger .info (f "Truncating { self .sql_table } " )
98
- self .conn .execute (f "TRUNCATE TABLE { self . sql_table } ;" )
99
+ logger .info ("Truncating {}" . format ( self .sql_table ) )
100
+ self .conn .execute ("TRUNCATE TABLE {};" . format ( self . sql_table ) )
99
101
100
102
def analyze (self ):
101
103
"""Run ANALYZE on PostgreSQL table"""
102
- logger .info (f "Analyzing { self .sql_table } " )
103
- self .conn .execute (f "ANALYZE { self . sql_table } ;" )
104
+ logger .info ("Analyzing {}" . format ( self .sql_table ) )
105
+ self .conn .execute ("ANALYZE {};" . format ( self . sql_table ) )
104
106
105
- def copy_from_file (self , file_object : StringIO ):
107
+ def copy_from_file (self , file_object ):
106
108
"""
107
109
COPY to PostgreSQL table using StringIO CSV object
108
110
109
111
Parameters
110
112
----------
111
- file_object: CSV formatted data to COPY from DataFrame to PostgreSQL
113
+ file_object: StringIO
114
+ CSV formatted data to COPY from DataFrame to PostgreSQL
112
115
"""
113
116
cur = self .conn .connection .cursor ()
114
117
file_object .seek (0 )
115
118
columns = file_object .readline ()
116
- sql = f"COPY { self .sql_table } ({ columns } ) FROM STDIN WITH CSV FREEZE"
119
+ sql = "COPY {table} ({columns}) FROM STDIN WITH CSV FREEZE" .format (
120
+ table = self .sql_table , columns = columns
121
+ )
117
122
cur .copy_expert (sql = sql , file = file_object )
118
123
119
- def data_formatting (self , df : DataFrame , functions : List [ Callable ] = [], ** kwargs ):
124
+ def data_formatting (self , df , functions = [], ** kwargs ):
120
125
"""
121
126
Call each function in the functions list arg on the DataFrame and return
122
127
123
128
Parameters
124
129
----------
125
- df: dataframe to format
126
- functions: list of functions to apply to df. each gets passed df, self as
127
- copy_obj, and all kwargs passed to data_formatting
128
- **kwargs: kwargs to pass on to each function
130
+ df: pandas DataFrame
131
+ dataframe to format
132
+ functions: list of functions
133
+ Functions to apply to df. each gets passed df, self as copy_obj, and all
134
+ kwargs passed to data_formatting
135
+ **kwargs
136
+ kwargs to pass on to each function
129
137
"""
130
138
for f in functions :
131
139
df = f (df , copy_obj = self , ** kwargs )
0 commit comments