Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 74edfb7

Browse filesBrowse files
authored
Merge pull request cid-harvard#3 from cid-harvard/feature/py2.7-compatibility
Python 2.7 compatibility changes
2 parents 9b1e3e1 + 115920e commit 74edfb7
Copy full SHA for 74edfb7

File tree

Expand file treeCollapse file tree

5 files changed

+242
-164
lines changed
Filter options
Expand file treeCollapse file tree

5 files changed

+242
-164
lines changed

‎pandas_to_postgres/_base_copy.py

Copy file name to clipboardExpand all lines: pandas_to_postgres/_base_copy.py
+44-36Lines changed: 44 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,6 @@
11
from .utilities import logger
2-
from io import StringIO
3-
from pandas import DataFrame
4-
from typing import Callable, List
52
from sqlalchemy.schema import AddConstraint, DropConstraint
63
from sqlalchemy.exc import SQLAlchemyError
7-
from sqlalchemy.sql.schema import Table
8-
from sqlalchemy.engine.base import Connection
94

105

116
class BaseCopy(object):
@@ -15,21 +10,26 @@ class BaseCopy(object):
1510

1611
def __init__(
1712
self,
18-
defer_sql_objs: bool = False,
19-
conn: Connection = None,
20-
table_obj: Table = None,
21-
sql_table: str = None,
22-
csv_chunksize: int = 10 ** 6,
13+
defer_sql_objs=False,
14+
conn=None,
15+
table_obj=None,
16+
sql_table=None,
17+
csv_chunksize=10 ** 6,
2318
):
2419
"""
2520
Parameters
2621
----------
27-
defer_sql_objs: multiprocessing has issue with passing SQLALchemy objects, so if
22+
defer_sql_objs: bool
23+
multiprocessing has issue with passing SQLALchemy objects, so if
2824
True, defer attributing these to the object until after pickled by Pool
29-
conn: SQLAlchemy connection managed outside of the object
30-
table_obj: SQLAlchemy object for the destination SQL Table
31-
sql_table: string of SQL table name
32-
csv_chunksize: max rows to keep in memory when generating CSV for COPY
25+
conn: SQLAlchemy Connection
26+
Managed outside of the object
27+
table_obj: SQLAlchemy Table
28+
Model object for the destination SQL Table
29+
sql_table: string
30+
SQL table name
31+
csv_chunksize: int
32+
Max rows to keep in memory when generating CSV for COPY
3333
"""
3434

3535
self.rows = 0
@@ -47,8 +47,10 @@ def instantiate_sql_objs(self, conn, table_obj):
4747
4848
Parameters
4949
----------
50-
conn: SQLAlchemy connection managed outside of the object
51-
table_obj: SQLAlchemy object for the destination SQL Table
50+
conn: SQLAlchemy Connection
51+
Managed outside of the object
52+
table_obj: SQLAlchemy Table
53+
Model object for the destination SQL Table
5254
"""
5355
self.conn = conn
5456
self.table_obj = table_obj
@@ -61,71 +63,77 @@ def drop_pk(self):
6163
Drop primary key constraints on PostgreSQL table as well as CASCADE any other
6264
constraints that may rely on the PK
6365
"""
64-
logger.info(f"Dropping {self.sql_table} primary key")
66+
logger.info("Dropping {} primary key".format(self.sql_table))
6567
try:
6668
with self.conn.begin_nested():
6769
self.conn.execute(DropConstraint(self.primary_key, cascade=True))
6870
except SQLAlchemyError:
69-
logger.info(f"{self.sql_table} primary key not found. Skipping")
71+
logger.info("{} primary key not found. Skipping".format(self.sql_table))
7072

7173
def create_pk(self):
7274
"""Create primary key constraints on PostgreSQL table"""
73-
logger.info(f"Creating {self.sql_table} primary key")
75+
logger.info("Creating {} primary key".format(self.sql_table))
7476
self.conn.execute(AddConstraint(self.primary_key))
7577

7678
def drop_fks(self):
7779
"""Drop foreign key constraints on PostgreSQL table"""
7880
for fk in self.foreign_keys:
79-
logger.info(f"Dropping foreign key {fk.name}")
81+
logger.info("Dropping foreign key {}".format(fk.name))
8082
try:
8183
with self.conn.begin_nested():
8284
self.conn.execute(DropConstraint(fk))
8385
except SQLAlchemyError:
84-
logger.warn(f"Foreign key {fk.name} not found")
86+
logger.warn("Foreign key {} not found".format(fk.name))
8587

8688
def create_fks(self):
8789
"""Create foreign key constraints on PostgreSQL table"""
8890
for fk in self.foreign_keys:
8991
try:
90-
logger.info(f"Creating foreign key {fk.name}")
92+
logger.info("Creating foreign key {fk.name}".format(fk.name))
9193
self.conn.execute(AddConstraint(fk))
9294
except SQLAlchemyError:
93-
logger.warn(f"Error creating foreign key {fk.name}")
95+
logger.warn("Error creating foreign key {fk.name}".format(fk.name))
9496

9597
def truncate(self):
9698
"""TRUNCATE PostgreSQL table"""
97-
logger.info(f"Truncating {self.sql_table}")
98-
self.conn.execute(f"TRUNCATE TABLE {self.sql_table};")
99+
logger.info("Truncating {}".format(self.sql_table))
100+
self.conn.execute("TRUNCATE TABLE {};".format(self.sql_table))
99101

100102
def analyze(self):
101103
"""Run ANALYZE on PostgreSQL table"""
102-
logger.info(f"Analyzing {self.sql_table}")
103-
self.conn.execute(f"ANALYZE {self.sql_table};")
104+
logger.info("Analyzing {}".format(self.sql_table))
105+
self.conn.execute("ANALYZE {};".format(self.sql_table))
104106

105-
def copy_from_file(self, file_object: StringIO):
107+
def copy_from_file(self, file_object):
106108
"""
107109
COPY to PostgreSQL table using StringIO CSV object
108110
109111
Parameters
110112
----------
111-
file_object: CSV formatted data to COPY from DataFrame to PostgreSQL
113+
file_object: StringIO
114+
CSV formatted data to COPY from DataFrame to PostgreSQL
112115
"""
113116
cur = self.conn.connection.cursor()
114117
file_object.seek(0)
115118
columns = file_object.readline()
116-
sql = f"COPY {self.sql_table} ({columns}) FROM STDIN WITH CSV FREEZE"
119+
sql = "COPY {table} ({columns}) FROM STDIN WITH CSV FREEZE".format(
120+
table=self.sql_table, columns=columns
121+
)
117122
cur.copy_expert(sql=sql, file=file_object)
118123

119-
def data_formatting(self, df: DataFrame, functions: List[Callable] = [], **kwargs):
124+
def data_formatting(self, df, functions=[], **kwargs):
120125
"""
121126
Call each function in the functions list arg on the DataFrame and return
122127
123128
Parameters
124129
----------
125-
df: dataframe to format
126-
functions: list of functions to apply to df. each gets passed df, self as
127-
copy_obj, and all kwargs passed to data_formatting
128-
**kwargs: kwargs to pass on to each function
130+
df: pandas DataFrame
131+
dataframe to format
132+
functions: list of functions
133+
Functions to apply to df. each gets passed df, self as copy_obj, and all
134+
kwargs passed to data_formatting
135+
**kwargs
136+
kwargs to pass on to each function
129137
"""
130138
for f in functions:
131139
df = f(df, copy_obj=self, **kwargs)

‎pandas_to_postgres/copy_df.py

Copy file name to clipboardExpand all lines: pandas_to_postgres/copy_df.py
+22-11Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,31 @@
11
from .utilities import create_file_object, df_generator, logger, cast_pandas
22
from ._base_copy import BaseCopy
33

4-
import pandas as pd
5-
from sqlalchemy.sql.schema import Table
6-
from sqlalchemy.engine.base import Connection
7-
84

95
class DataFrameCopy(BaseCopy):
6+
"""
7+
Class for handling a standard case of iterating over a pandas DataFrame in chunks
8+
and COPYing to PostgreSQL via StringIO CSV
9+
"""
10+
1011
def __init__(
11-
self,
12-
df: pd.DataFrame,
13-
defer_sql_objs: bool = False,
14-
conn: Connection = None,
15-
table_obj: Table = None,
16-
csv_chunksize: int = 10 ** 6,
12+
self, df, defer_sql_objs=False, conn=None, table_obj=None, csv_chunksize=10 ** 6
1713
):
14+
"""
15+
Parameters
16+
----------
17+
df: pandas DataFrame
18+
Data to copy to database table
19+
defer_sql_objs: bool
20+
multiprocessing has issue with passing SQLALchemy objects, so if
21+
True, defer attributing these to the object until after pickled by Pool
22+
conn: SQlAlchemy Connection
23+
Managed outside of the object
24+
table_obj: SQLAlchemy model object
25+
Destination SQL Table
26+
csv_chunksize: int
27+
Max rows to keep in memory when generating CSV for COPY
28+
"""
1829
super().__init__(defer_sql_objs, conn, table_obj, csv_chunksize)
1930

2031
self.df = df
@@ -37,7 +48,7 @@ def copy(self, functions=[cast_pandas]):
3748
self.copy_from_file(fo)
3849
del fo
3950

40-
logger.info(f"All chunks copied ({self.rows} rows)")
51+
logger.info("All chunks copied ({} rows)".format(self.rows))
4152

4253
self.create_pk()
4354
self.create_fks()

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.