Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit b8c8439

Browse filesBrowse files
bleonard33makmanalp
authored andcommitted
Better logging for multiprocessing (cid-harvard#6)
1 parent ad12707 commit b8c8439
Copy full SHA for b8c8439

File tree

Expand file treeCollapse file tree

6 files changed

+75
-59
lines changed
Filter options
Expand file treeCollapse file tree

6 files changed

+75
-59
lines changed

‎pandas_to_postgres/__init__.py

Copy file name to clipboardExpand all lines: pandas_to_postgres/__init__.py
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22
from .copy_hdf import HDFTableCopy, SmallHDFTableCopy, BigHDFTableCopy
33
from .hdf_to_postgres import hdf_to_postgres, create_hdf_table_objects, copy_worker
44
from .utilities import (
5-
logger,
65
hdf_metadata,
76
create_file_object,
87
df_generator,
98
cast_pandas,
9+
get_logger,
1010
)

‎pandas_to_postgres/_base_copy.py

Copy file name to clipboardExpand all lines: pandas_to_postgres/_base_copy.py
+18-14Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from .utilities import logger
1+
from .utilities import get_logger
22
from sqlalchemy.schema import AddConstraint, DropConstraint
33
from sqlalchemy.exc import SQLAlchemyError
44

@@ -36,14 +36,15 @@ def __init__(
3636
self.csv_chunksize = csv_chunksize
3737

3838
if not defer_sql_objs:
39-
self.instantiate_sql_objs(conn, table_obj)
39+
self.instantiate_attrs(conn, table_obj)
4040
else:
4141
self.sql_table = sql_table
4242

43-
def instantiate_sql_objs(self, conn, table_obj):
43+
def instantiate_attrs(self, conn, table_obj):
4444
"""
45-
When using multiprocessing, pickling of SQLAlchemy objects in __init__ causes
46-
issues, so allow for deferring until after the pickling to fetch SQLAlchemy objs
45+
When using multiprocessing, pickling of logger and SQLAlchemy objects in
46+
__init__ causes issues, so allow for deferring until after the pickling to fetch
47+
SQLAlchemy objs
4748
4849
Parameters
4950
----------
@@ -55,6 +56,7 @@ def instantiate_sql_objs(self, conn, table_obj):
5556
self.conn = conn
5657
self.table_obj = table_obj
5758
self.sql_table = table_obj.name
59+
self.logger = get_logger(self.sql_table)
5860
self.primary_key = table_obj.primary_key
5961
self.foreign_keys = table_obj.foreign_key_constraints
6062

@@ -63,45 +65,47 @@ def drop_pk(self):
6365
Drop primary key constraints on PostgreSQL table as well as CASCADE any other
6466
constraints that may rely on the PK
6567
"""
66-
logger.info("Dropping {} primary key".format(self.sql_table))
68+
self.logger.info("Dropping {} primary key".format(self.sql_table))
6769
try:
6870
with self.conn.begin_nested():
6971
self.conn.execute(DropConstraint(self.primary_key, cascade=True))
7072
except SQLAlchemyError:
71-
logger.info("{} primary key not found. Skipping".format(self.sql_table))
73+
self.logger.info(
74+
"{} primary key not found. Skipping".format(self.sql_table)
75+
)
7276

7377
def create_pk(self):
7478
"""Create primary key constraints on PostgreSQL table"""
75-
logger.info("Creating {} primary key".format(self.sql_table))
79+
self.logger.info("Creating {} primary key".format(self.sql_table))
7680
self.conn.execute(AddConstraint(self.primary_key))
7781

7882
def drop_fks(self):
7983
"""Drop foreign key constraints on PostgreSQL table"""
8084
for fk in self.foreign_keys:
81-
logger.info("Dropping foreign key {}".format(fk.name))
85+
self.logger.info("Dropping foreign key {}".format(fk.name))
8286
try:
8387
with self.conn.begin_nested():
8488
self.conn.execute(DropConstraint(fk))
8589
except SQLAlchemyError:
86-
logger.warn("Foreign key {} not found".format(fk.name))
90+
self.logger.warn("Foreign key {} not found".format(fk.name))
8791

8892
def create_fks(self):
8993
"""Create foreign key constraints on PostgreSQL table"""
9094
for fk in self.foreign_keys:
9195
try:
92-
logger.info("Creating foreign key {}".format(fk.name))
96+
self.logger.info("Creating foreign key {}".format(fk.name))
9397
self.conn.execute(AddConstraint(fk))
9498
except SQLAlchemyError:
95-
logger.warn("Error creating foreign key {}".format(fk.name))
99+
self.logger.warn("Error creating foreign key {}".format(fk.name))
96100

97101
def truncate(self):
98102
"""TRUNCATE PostgreSQL table"""
99-
logger.info("Truncating {}".format(self.sql_table))
103+
self.logger.info("Truncating {}".format(self.sql_table))
100104
self.conn.execute("TRUNCATE TABLE {};".format(self.sql_table))
101105

102106
def analyze(self):
103107
"""Run ANALYZE on PostgreSQL table"""
104-
logger.info("Analyzing {}".format(self.sql_table))
108+
self.logger.info("Analyzing {}".format(self.sql_table))
105109
self.conn.execute("ANALYZE {};".format(self.sql_table))
106110

107111
def copy_from_file(self, file_object):

‎pandas_to_postgres/copy_df.py

Copy file name to clipboardExpand all lines: pandas_to_postgres/copy_df.py
+5-5Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from .utilities import create_file_object, df_generator, logger, cast_pandas
1+
from .utilities import create_file_object, df_generator, cast_pandas
22
from ._base_copy import BaseCopy
33

44

@@ -38,17 +38,17 @@ def copy(self, functions=[cast_pandas]):
3838
with self.conn.begin():
3939
self.truncate()
4040

41-
logger.info("Creating generator for chunking dataframe")
41+
self.logger.info("Creating generator for chunking dataframe")
4242
for chunk in df_generator(self.df, self.csv_chunksize):
4343

44-
logger.info("Creating CSV in memory")
44+
self.logger.info("Creating CSV in memory")
4545
fo = create_file_object(chunk)
4646

47-
logger.info("Copying chunk to database")
47+
self.logger.info("Copying chunk to database")
4848
self.copy_from_file(fo)
4949
del fo
5050

51-
logger.info("All chunks copied ({} rows)".format(self.rows))
51+
self.logger.info("All chunks copied ({} rows)".format(self.rows))
5252

5353
self.create_pk()
5454
self.create_fks()

‎pandas_to_postgres/copy_hdf.py

Copy file name to clipboardExpand all lines: pandas_to_postgres/copy_hdf.py
+34-28Lines changed: 34 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
from .utilities import create_file_object, df_generator, logger, cast_pandas
2-
from ._base_copy import BaseCopy
31
import pandas as pd
2+
from .utilities import create_file_object, df_generator, cast_pandas
3+
from ._base_copy import BaseCopy
44

55

66
class HDFTableCopy(BaseCopy):
@@ -90,33 +90,35 @@ def hdf_to_pg(self, data_formatters=[cast_pandas], data_formatter_kwargs={}):
9090
data_formatter_kwargs: list of kwargs to pass to data_formatters functions
9191
"""
9292
if self.hdf_tables is None:
93-
logger.warn("No HDF table found for SQL table {}".format(self.sql_table))
93+
self.logger.warn(
94+
"No HDF table found for SQL table {}".format(self.sql_table)
95+
)
9496
return
9597

9698
for hdf_table in self.hdf_tables:
97-
logger.info("*** {} ***".format(hdf_table))
99+
self.logger.info("*** {} ***".format(hdf_table))
98100

99-
logger.info("Reading HDF table")
101+
self.logger.info("Reading HDF table")
100102
df = pd.read_hdf(self.file_name, key=hdf_table)
101103
self.rows += len(df)
102104

103105
data_formatter_kwargs["hdf_table"] = hdf_table
104-
logger.info("Formatting data")
106+
self.logger.info("Formatting data")
105107
df = self.data_formatting(
106108
df, functions=data_formatters, **data_formatter_kwargs
107109
)
108110

109-
logger.info("Creating generator for chunking dataframe")
110-
for chunk in df_generator(df, self.csv_chunksize):
111+
self.logger.info("Creating generator for chunking dataframe")
112+
for chunk in df_generator(df, self.csv_chunksize, logger=self.logger):
111113

112-
logger.info("Creating CSV in memory")
114+
self.logger.info("Creating CSV in memory")
113115
fo = create_file_object(chunk)
114116

115-
logger.info("Copying chunk to database")
117+
self.logger.info("Copying chunk to database")
116118
self.copy_from_file(fo)
117119
del fo
118120
del df
119-
logger.info("All chunks copied ({} rows)".format(self.rows))
121+
self.logger.info("All chunks copied ({} rows)".format(self.rows))
120122

121123

122124
class SmallHDFTableCopy(HDFTableCopy):
@@ -136,29 +138,29 @@ def hdf_to_pg(self, data_formatters=[cast_pandas], data_formatter_kwargs={}):
136138
data_formatter_kwargs: list of kwargs to pass to data_formatters functions
137139
"""
138140
if self.hdf_tables is None:
139-
logger.warn("No HDF table found for SQL table {self.sql_table}")
141+
self.logger.warn("No HDF table found for SQL table {self.sql_table}")
140142
return
141143

142144
for hdf_table in self.hdf_tables:
143-
logger.info("*** {} ***".format(hdf_table))
144-
logger.info("Reading HDF table")
145+
self.logger.info("*** {} ***".format(hdf_table))
146+
self.logger.info("Reading HDF table")
145147
df = pd.read_hdf(self.file_name, key=hdf_table)
146148
self.rows += len(df)
147149

148150
data_formatter_kwargs["hdf_table"] = hdf_table
149-
logger.info("Formatting data")
151+
self.logger.info("Formatting data")
150152
df = self.data_formatting(
151153
df, functions=data_formatters, **data_formatter_kwargs
152154
)
153155

154-
logger.info("Creating CSV in memory")
156+
self.logger.info("Creating CSV in memory")
155157
fo = create_file_object(df)
156158

157-
logger.info("Copying table to database")
159+
self.logger.info("Copying table to database")
158160
self.copy_from_file(fo)
159161
del df
160162
del fo
161-
logger.info("All chunks copied ({} rows)".format(self.rows))
163+
self.logger.info("All chunks copied ({} rows)".format(self.rows))
162164

163165

164166
class BigHDFTableCopy(HDFTableCopy):
@@ -181,11 +183,13 @@ def hdf_to_pg(self, data_formatters=[cast_pandas], data_formatter_kwargs={}):
181183
data_formatter_kwargs: list of kwargs to pass to data_formatters functions
182184
"""
183185
if self.hdf_tables is None:
184-
logger.warn("No HDF table found for SQL table {}".format(self.sql_table))
186+
self.logger.warn(
187+
"No HDF table found for SQL table {}".format(self.sql_table)
188+
)
185189
return
186190

187191
for hdf_table in self.hdf_tables:
188-
logger.info("*** {} ***".format(hdf_table))
192+
self.logger.info("*** {} ***".format(hdf_table))
189193

190194
with pd.HDFStore(self.file_name) as store:
191195
nrows = store.get_storer(hdf_table).nrows
@@ -199,26 +203,28 @@ def hdf_to_pg(self, data_formatters=[cast_pandas], data_formatter_kwargs={}):
199203
start = 0
200204

201205
for i in range(n_chunks):
202-
logger.info("*** HDF chunk {i} of {n} ***".format(i=i + 1, n=n_chunks))
203-
logger.info("Reading HDF table")
206+
self.logger.info(
207+
"*** HDF chunk {i} of {n} ***".format(i=i + 1, n=n_chunks)
208+
)
209+
self.logger.info("Reading HDF table")
204210
stop = min(start + self.hdf_chunksize, nrows)
205211
df = pd.read_hdf(self.file_name, key=hdf_table, start=start, stop=stop)
206212

207213
start += self.hdf_chunksize
208214

209215
data_formatter_kwargs["hdf_table"] = hdf_table
210-
logger.info("Formatting data")
216+
self.logger.info("Formatting data")
211217
df = self.data_formatting(
212218
df, functions=data_formatters, **data_formatter_kwargs
213219
)
214220

215-
logger.info("Creating generator for chunking dataframe")
216-
for chunk in df_generator(df, self.csv_chunksize):
217-
logger.info("Creating CSV in memory")
221+
self.logger.info("Creating generator for chunking dataframe")
222+
for chunk in df_generator(df, self.csv_chunksize, logger=self.logger):
223+
self.logger.info("Creating CSV in memory")
218224
fo = create_file_object(chunk)
219225

220-
logger.info("Copying chunk to database")
226+
self.logger.info("Copying chunk to database")
221227
self.copy_from_file(fo)
222228
del fo
223229
del df
224-
logger.info("All chunks copied ({} rows)".format(self.rows))
230+
self.logger.info("All chunks copied ({} rows)".format(self.rows))

‎pandas_to_postgres/hdf_to_postgres.py

Copy file name to clipboardExpand all lines: pandas_to_postgres/hdf_to_postgres.py
+5-2Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
from multiprocessing import Pool
22
from sqlalchemy import MetaData, create_engine
33
from .copy_hdf import HDFTableCopy
4-
from .utilities import cast_pandas
4+
from .utilities import cast_pandas, get_logger
5+
6+
7+
logger = get_logger("hdf_to_postgres")
58

69

710
def create_hdf_table_objects(
@@ -92,7 +95,7 @@ def copy_worker(
9295
if table_obj is None:
9396
raise ValueError("Table {} does not exist.".format(copy_obj.sql_table))
9497

95-
copy_obj.instantiate_sql_objs(conn, table_obj)
98+
copy_obj.instantiate_attrs(conn, table_obj)
9699

97100
# Run the task
98101
copy_obj.copy(

‎pandas_to_postgres/utilities.py

Copy file name to clipboardExpand all lines: pandas_to_postgres/utilities.py
+12-9Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
import logging
2-
from pandas import HDFStore, isna
32
from collections import defaultdict
3+
from pandas import isna, HDFStore
44
from io import StringIO
55

66

7-
logging.basicConfig(
8-
level=logging.DEBUG,
9-
format="%(levelname)s %(asctime)s.%(msecs)03d %(message)s",
10-
datefmt="%Y-%m-%d,%H:%M:%S",
11-
)
7+
def get_logger(name):
8+
logging.basicConfig(
9+
level=logging.INFO,
10+
format="%(asctime)s.%(msecs)03d - %(name)s - %(levelname)s %(message)s",
11+
datefmt="%Y-%m-%d,%H:%M:%S",
12+
)
1213

13-
logger = logging.getLogger("pandas_to_postgres")
14+
return logging.getLogger(name)
1415

1516

1617
def hdf_metadata(file_name, keys=None, metadata_attr=None, metadata_keys=[]):
@@ -42,6 +43,7 @@ def hdf_metadata(file_name, keys=None, metadata_attr=None, metadata_keys=[]):
4243

4344
sql_to_hdf = defaultdict(set)
4445
metadata_vars = defaultdict(dict)
46+
logger = get_logger("hdf_metadata")
4547

4648
with HDFStore(file_name, mode="r") as store:
4749
keys = keys or store.keys()
@@ -90,7 +92,7 @@ def create_file_object(df):
9092
return file_object
9193

9294

93-
def df_generator(df, chunksize=10 ** 6):
95+
def df_generator(df, chunksize=10 ** 6, logger=None):
9496
"""
9597
Create a generator to iterate over chunks of a dataframe
9698
@@ -108,7 +110,8 @@ def df_generator(df, chunksize=10 ** 6):
108110
n_chunks = (df.shape[0] // chunksize) + 1
109111

110112
for i in range(n_chunks):
111-
logger.info("Chunk {i}/{n}".format(i=i + 1, n=n_chunks))
113+
if logger:
114+
logger.info("Chunk {i}/{n}".format(i=i + 1, n=n_chunks))
112115
yield df.iloc[rows : rows + chunksize]
113116
rows += chunksize
114117

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.