Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 0a34ac3

Browse filesBrowse files
committed
Refactor to BaseCopy class, other refactoring to make multiprocessing work with HDFTableCopy objects
1 parent b3e2d45 commit 0a34ac3
Copy full SHA for 0a34ac3

File tree

Expand file treeCollapse file tree

5 files changed

+193
-212
lines changed
Filter options
Expand file treeCollapse file tree

5 files changed

+193
-212
lines changed

‎pandas_to_postgres/_base_copy.py

Copy file name to clipboard
+89Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
from .utilities import (
2+
create_file_object,
3+
df_generator,
4+
logger,
5+
classification_to_pandas,
6+
cast_pandas,
7+
add_level_metadata,
8+
HDFMetadata,
9+
)
10+
11+
import pandas as pd
12+
from sqlalchemy.schema import AddConstraint, DropConstraint
13+
from sqlalchemy.exc import SQLAlchemyError
14+
from sqlalchemy.sql.schema import Table
15+
from sqlalchemy.engine.base import Connection
16+
17+
18+
class BaseCopy(object):
19+
def __init__(
20+
self,
21+
defer_sql_objs: bool = False,
22+
conn=None,
23+
table_obj=None,
24+
sql_table=None,
25+
csv_chunksize: int = 10 ** 6,
26+
):
27+
28+
self.rows = 0
29+
self.columns = None
30+
self.csv_chunksize = csv_chunksize
31+
32+
if not defer_sql_objs:
33+
self.instantiate_sql_objs(conn, table_obj)
34+
else:
35+
self.sql_table = sql_table
36+
37+
def instantiate_sql_objs(self, conn, table_obj):
38+
"""
39+
When using multiprocessing, pickling of SQLAlchemy objects in __init__ causes
40+
issues, so allow for deferring until after the pickling to fetch SQLAlchemy objs
41+
"""
42+
self.conn = conn
43+
self.table_obj = table_obj
44+
self.sql_table = table_obj.name
45+
self.primary_key = table_obj.primary_key
46+
self.foreign_keys = table_obj.foreign_key_constraints
47+
48+
def drop_pk(self):
49+
logger.info(f"Dropping {self.sql_table} primary key")
50+
try:
51+
with self.conn.begin_nested():
52+
self.conn.execute(DropConstraint(self.primary_key, cascade=True))
53+
except SQLAlchemyError:
54+
logger.info(f"{self.sql_table} primary key not found. Skipping")
55+
56+
def create_pk(self):
57+
logger.info(f"Creating {self.sql_table} primary key")
58+
self.conn.execute(AddConstraint(self.primary_key))
59+
60+
def drop_fks(self):
61+
for fk in self.foreign_keys:
62+
logger.info(f"Dropping foreign key {fk.name}")
63+
try:
64+
with self.conn.begin_nested():
65+
self.conn.execute(DropConstraint(fk))
66+
except SQLAlchemyError:
67+
logger.warn(f"Foreign key {fk.name} not found")
68+
69+
def create_fks(self):
70+
for fk in self.foreign_keys:
71+
try:
72+
logger.info(f"Creating foreign key {fk.name}")
73+
self.conn.execute(AddConstraint(fk))
74+
except SQLAlchemyError:
75+
logger.warn(f"Error creating foreign key {fk.name}")
76+
77+
def truncate(self):
78+
logger.info(f"Truncating {self.sql_table}")
79+
self.conn.execute(f"TRUNCATE TABLE {self.sql_table};")
80+
81+
def analyze(self):
82+
logger.info(f"Analyzing {self.sql_table}")
83+
self.conn.execute(f"ANALYZE {self.sql_table};")
84+
85+
def copy_from_file(self, file_object):
86+
cur = self.conn.connection.cursor()
87+
cols = ", ".join([f"{col}" for col in self.columns])
88+
sql = f"COPY {self.sql_table} ({cols}) FROM STDIN WITH CSV HEADER FREEZE"
89+
cur.copy_expert(sql=sql, file=file_object)

‎pandas_to_postgres/copy_df.py

Copy file name to clipboardExpand all lines: pandas_to_postgres/copy_df.py
+9-59Lines changed: 9 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -6,79 +6,29 @@
66
add_level_metadata,
77
)
88

9+
from ._base_copy import BaseCopy
10+
911
import pandas as pd
1012
from sqlalchemy.sql.schema import Table
1113
from sqlalchemy.engine.base import Connection
12-
from sqlalchemy.schema import AddConstraint, DropConstraint
13-
from sqlalchemy.exc import SQLAlchemyError
1414

1515

16-
class DataFrameCopy(object):
16+
class DataFrameCopy(BaseCopy):
1717
def __init__(
1818
self,
19-
conn: Connection,
20-
table_obj: Table,
2119
df: pd.DataFrame,
22-
levels: dict = None,
20+
defer_sql_objs: bool = False,
21+
conn: Connection = None,
22+
table_obj: Table = None,
2323
csv_chunksize: int = 10 ** 6,
24+
levels: dict = None,
2425
):
25-
self.conn = conn
26-
self.table_obj = table_obj
27-
self.sql_table = self.table_obj.name
26+
BaseCopy(defer_sql_objs, conn, table_obj, csv_chunksize)
27+
2828
self.df = df
2929
self.levels = levels
3030
self.columns = self.df.columns
3131
self.rows = self.df.shape[0]
32-
self.csv_chunksize = csv_chunksize
33-
self.primary_key = self.table_obj.primary_key
34-
self.foreign_keys = self.table_obj.foreign_key_constraints
35-
36-
def close_conn(self):
37-
self.conn.close()
38-
del self.conn
39-
40-
def drop_pk(self):
41-
logger.info(f"Dropping {self.sql_table} primary key")
42-
try:
43-
with self.conn.begin_nested():
44-
self.conn.execute(DropConstraint(self.primary_key, cascade=True))
45-
except SQLAlchemyError:
46-
logger.info(f"{self.sql_table} primary key not found. Skipping")
47-
48-
def create_pk(self):
49-
logger.info(f"Creating {self.sql_table} primary key")
50-
self.conn.execute(AddConstraint(self.primary_key))
51-
52-
def drop_fks(self):
53-
for fk in self.foreign_keys:
54-
logger.info(f"Dropping foreign key {fk.name}")
55-
try:
56-
with self.conn.begin_nested():
57-
self.conn.execute(DropConstraint(fk))
58-
except SQLAlchemyError:
59-
logger.warn(f"Foreign key {fk.name} not found")
60-
61-
def create_fks(self):
62-
for fk in self.foreign_keys:
63-
try:
64-
logger.info(f"Creating foreign key {fk.name}")
65-
self.conn.execute(AddConstraint(fk))
66-
except SQLAlchemyError:
67-
logger.warn(f"Error creating foreign key {fk.name}")
68-
69-
def truncate(self):
70-
logger.info(f"Truncating {self.sql_table}")
71-
self.conn.execute(f"TRUNCATE TABLE {self.sql_table};")
72-
73-
def analyze(self):
74-
logger.info(f"Analyzing {self.sql_table}")
75-
self.conn.execute(f"ANALYZE {self.sql_table};")
76-
77-
def copy_from_file(self, file_object):
78-
cur = self.conn.connection.cursor()
79-
cols = ", ".join([f"{col}" for col in self.columns])
80-
sql = f"COPY {self.sql_table} ({cols}) FROM STDIN WITH CSV HEADER FREEZE"
81-
cur.copy_expert(sql=sql, file=file_object)
8232

8333
def format_df(self):
8434
# Handle NaN --> None type casting

‎pandas_to_postgres/copy_hdf.py

Copy file name to clipboardExpand all lines: pandas_to_postgres/copy_hdf.py
+66-72Lines changed: 66 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -5,91 +5,49 @@
55
classification_to_pandas,
66
cast_pandas,
77
add_level_metadata,
8+
HDFMetadata,
89
)
910

10-
import pandas as pd
11-
from sqlalchemy.schema import AddConstraint, DropConstraint
12-
from sqlalchemy.exc import SQLAlchemyError
13-
11+
from ._base_copy import BaseCopy
1412

15-
class HDFTableCopy(object):
16-
17-
rows = 0
18-
columns = None
13+
import pandas as pd
14+
from sqlalchemy.sql.schema import Table
15+
from sqlalchemy.engine.base import Connection
16+
17+
18+
class HDFTableCopy(BaseCopy):
19+
def __init__(
20+
self,
21+
hdf_tables: list,
22+
hdf_meta: HDFMetadata,
23+
defer_sql_objs: bool = False,
24+
conn=None,
25+
table_obj=None,
26+
sql_table=None,
27+
csv_chunksize: int = 10 ** 6,
28+
):
29+
BaseCopy.__init__(
30+
self, defer_sql_objs, conn, table_obj, sql_table, csv_chunksize
31+
)
1932

20-
def __init__(self, sql_table, hdf_tables, hdf_meta, csv_chunksize=10 ** 6):
21-
self.sql_table = sql_table
2233
self.hdf_tables = hdf_tables
23-
self.csv_chunksize = csv_chunksize
2434

2535
# Info from the HDFMetadata object
2636
self.levels = hdf_meta.levels
2737
self.file_name = hdf_meta.file_name
2838
self.hdf_chunksize = hdf_meta.chunksize
2939

30-
def table_metadata(self):
31-
self.table_obj = db.metadata.tables[self.sql_table]
32-
self.primary_key = self.table_obj.primary_key
33-
self.foreign_keys = self.table_obj.foreign_key_constraints
34-
35-
def set_conn(self, conn):
36-
self.conn = conn
37-
38-
def delete_conn(self):
39-
del self.conn
40-
41-
def drop_pk(self):
42-
logger.info(f"Dropping {self.sql_table} primary key")
43-
try:
44-
with self.conn.begin_nested():
45-
self.conn.execute(DropConstraint(self.primary_key, cascade=True))
46-
except SQLAlchemyError:
47-
logger.info(f"{self.sql_table} primary key not found. Skipping")
48-
49-
def create_pk(self):
50-
logger.info(f"Creating {self.sql_table} primary key")
51-
self.conn.execute(AddConstraint(self.primary_key))
52-
53-
def drop_fks(self):
54-
for fk in self.foreign_keys:
55-
logger.info(f"Dropping foreign key {fk.name}")
56-
try:
57-
with self.conn.begin_nested():
58-
self.conn.execute(DropConstraint(fk))
59-
except SQLAlchemyError:
60-
logger.warn(f"Foreign key {fk.name} not found")
61-
62-
def create_fks(self):
63-
for fk in self.foreign_keys:
64-
try:
65-
logger.info(f"Creating foreign key {fk.name}")
66-
self.conn.execute(AddConstraint(fk))
67-
except SQLAlchemyError:
68-
logger.warn(f"Error creating foreign key {fk.name}")
69-
70-
def truncate(self):
71-
logger.info(f"Truncating {self.sql_table}")
72-
self.conn.execute(f"TRUNCATE TABLE {self.sql_table};")
73-
74-
def analyze(self):
75-
logger.info(f"Analyzing {self.sql_table}")
76-
self.conn.execute(f"ANALYZE {self.sql_table};")
77-
78-
def copy_from_file(self, file_object):
79-
cur = self.conn.connection.cursor()
80-
cols = ", ".join([f"{col}" for col in self.columns])
81-
sql = f"COPY {self.sql_table} ({cols}) FROM STDIN WITH CSV HEADER FREEZE"
82-
cur.copy_expert(sql=sql, file=file_object)
83-
8440
def copy_table(self):
85-
self.table_metadata()
8641
self.drop_fks()
8742
self.drop_pk()
43+
44+
# These need to be one transaction to use COPY FREEZE
8845
with self.conn.begin():
8946
self.truncate()
9047
self.hdf_to_pg()
91-
self.create_pk()
92-
self.create_fks()
48+
49+
self.create_pk()
50+
self.create_fks()
9351
self.analyze()
9452

9553
def hdf_to_pg(self):
@@ -126,8 +84,26 @@ def hdf_to_pg(self):
12684

12785

12886
class ClassificationHDFTableCopy(HDFTableCopy):
129-
def __init__(self, sql_table, hdf_tables, hdf_meta, csv_chunksize=10 ** 6):
130-
HDFTableCopy.__init__(self, sql_table, hdf_tables, hdf_meta, csv_chunksize)
87+
def __init__(
88+
self,
89+
hdf_tables: list,
90+
hdf_meta: HDFMetadata,
91+
defer_sql_objs: bool = False,
92+
conn=None,
93+
table_obj=None,
94+
sql_table: str = None,
95+
csv_chunksize: int = 10 ** 6,
96+
):
97+
HDFTableCopy.__init__(
98+
self,
99+
hdf_tables,
100+
hdf_meta,
101+
defer_sql_objs,
102+
conn,
103+
table_obj,
104+
sql_table,
105+
csv_chunksize,
106+
)
131107

132108
def hdf_to_pg(self):
133109
if self.hdf_tables is None:
@@ -158,8 +134,26 @@ def hdf_to_pg(self):
158134

159135

160136
class BigHDFTableCopy(HDFTableCopy):
161-
def __init__(self, sql_table, hdf_tables, hdf_meta, csv_chunksize=10 ** 6):
162-
HDFTableCopy.__init__(self, sql_table, hdf_tables, hdf_meta, csv_chunksize)
137+
def __init__(
138+
self,
139+
hdf_tables: list,
140+
hdf_meta: HDFMetadata,
141+
defer_sql_objs: bool = False,
142+
conn=None,
143+
table_obj=None,
144+
sql_table=None,
145+
csv_chunksize: int = 10 ** 6,
146+
):
147+
HDFTableCopy.__init__(
148+
self,
149+
hdf_tables,
150+
hdf_meta,
151+
defer_sql_objs,
152+
conn,
153+
table_obj,
154+
sql_table,
155+
csv_chunksize,
156+
)
163157

164158
def hdf_to_pg(self):
165159
if self.hdf_tables is None:

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.