Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit f8f6c82

Browse filesBrowse files
committed
Refactor the hdf_to_postgres functions into one, create the engine in
the worker threads instead of passing it in.
1 parent 912c7fb commit f8f6c82
Copy full SHA for f8f6c82

File tree

Expand file treeCollapse file tree

2 files changed

+57
-57
lines changed
Filter options
Expand file treeCollapse file tree

2 files changed

+57
-57
lines changed

‎pandas_to_postgres/__init__.py

Copy file name to clipboardExpand all lines: pandas_to_postgres/__init__.py
-1Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
from .copy_hdf import HDFTableCopy, SmallHDFTableCopy, BigHDFTableCopy
33
from .hdf_to_postgres import (
44
hdf_to_postgres,
5-
multiprocess_hdf_to_postgres,
65
create_hdf_table_objects,
76
)
87
from .utilities import (

‎pandas_to_postgres/hdf_to_postgres.py

Copy file name to clipboard
+57-56Lines changed: 57 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from multiprocessing import Pool
22

3+
from sqlalchemy import MetaData, create_engine
4+
35
from .copy_hdf import HDFTableCopy
46
from .utilities import HDFMetadata
57

@@ -36,91 +38,90 @@ def create_hdf_table_objects(hdf_meta, csv_chunksize=10 ** 6):
3638
return tables
3739

3840

39-
def _copy_worker(copy_obj, defer_sql_objs=True):
40-
"""
41-
Handle a SQLAlchemy connection and copy using HDFTableCopy object
41+
def _copy_worker(copy_obj, engine_args, engine_kwargs, maintenance_work_mem="1G"):
42+
43+
# Since we fork()ed into a new process, the engine contains process
44+
# specific stuff that shouldn't be shared - this creates a fresh Engine
45+
# with the same settings but without those.
46+
47+
engine = create_engine(*engine_args, **engine_kwargs)
48+
metadata = MetaData(bind=engine)
49+
metadata.reflect()
50+
51+
with engine.connect() as conn:
4252

43-
copy_obj: HDFTableCopy or subclass
44-
Object to use to run the copy() method on
45-
defer_sql_objs: bool
46-
If True, SQL objects were not build upon instantiation of copy_obj and should
47-
be built before copying data to db (needed for multiprocessing)
48-
"""
49-
database.engine.dispose()
50-
with database.engine.connect() as conn:
5153
conn.execution_options(autocommit=True)
52-
conn.execute("SET maintenance_work_mem TO 1000000;")
5354

54-
if defer_sql_objs:
55-
table_obj = database.metadata.tables[copy_obj.sql_table]
56-
copy_obj.instantiate_sql_objs(conn, table_obj)
55+
if maintenance_work_mem is not None:
56+
conn.execute("SET maintenance_work_mem TO {};".format(maintenance_work_mem))
5757

58+
# Get SQLAlchemy Table object
59+
table_obj = metadata.tables.get(copy_obj.sql_table, None)
60+
if table_obj is None:
61+
raise ValueError("Table {} does not exist.".format(copy_obj.sql_table))
62+
63+
copy_obj.instantiate_sql_objs(conn, table_obj)
64+
65+
# Run the task
5866
copy_obj.copy()
5967

6068

61-
def hdf_to_postgres(file_name, db, keys=[], csv_chunksize=10 ** 6):
69+
def hdf_to_postgres(file_name, engine_args, engine_kwargs={}, keys=[],
70+
csv_chunksize=10 ** 6, processes=None,
71+
maintenance_work_mem=None):
6272
"""
6373
Copy tables in a HDF file to PostgreSQL database
6474
6575
Parameters
6676
----------
6777
file_name: str
6878
name of file or path to file of HDF to use to copy
69-
db: SQLAlchemy database object
70-
destination database
79+
engine_args: list
80+
arguments to pass into create_engine()
81+
engine_kwargs: dict
82+
keyword arguments to pass into create_engine()
7183
keys: list of strings
7284
HDF keys to copy
7385
csv_chunksize: int
7486
Maximum number of StringIO CSV rows to keep in memory at a time
87+
processes: int or None
88+
If None, run single threaded. If integer, number of processes in the
89+
multiprocessing Pool
90+
maintenance_work_mem: str or None
91+
What to set postgresql's maintenance_work_mem option to: this helps
92+
when rebuilding large indexes, etc.
7593
"""
7694

77-
global database
78-
database = db
79-
8095
hdf = HDFMetadata(
8196
file_name, keys, metadata_attr="atlas_metadata", metadata_keys=["levels"]
8297
)
8398

8499
tables = create_hdf_table_objects(hdf, csv_chunksize=csv_chunksize)
85100

86-
for table in tables:
87-
_copy_worker(table, defer_sql_objs=True)
101+
if processes is None:
88102

103+
# Single-threaded run
104+
for table in tables:
105+
_copy_worker(table, engine_args, engine_kwargs, maintenance_work_mem)
89106

90-
def multiprocess_hdf_to_postgres(
91-
file_name, db, keys=[], processes=4, csv_chunksize=10 ** 6
92-
):
93-
"""
94-
Copy tables in a HDF file to PostgreSQL database using a multiprocessing Pool
95-
96-
Parameters
97-
----------
98-
file_name: str
99-
Name of file or path to file of HDF to use to copy
100-
db: SQLAlchemy object
101-
Destination database
102-
keys: list of strings
103-
HDF keys to copy
104-
processes: int
105-
Number of processes in the Pool
106-
csv_chunksize: int
107-
Maximum number of StringIO CSV rows to keep in memory at a time
108-
"""
107+
elif type(processes) is int:
109108

110-
global database
111-
database = db
109+
args = zip(
110+
tables,
111+
[engine_args] * len(tables),
112+
[engine_kwargs] * len(tables),
113+
[maintenance_work_mem] * len(tables)
114+
)
112115

113-
hdf = HDFMetadata(
114-
file_name, keys, metadata_attr="atlas_metadata", metadata_keys=["levels"]
115-
)
116+
try:
117+
p = Pool(processes)
118+
p.starmap(_copy_worker, args, chunksize=1)
116119

117-
tables = create_hdf_table_objects(hdf, csv_chunksize=csv_chunksize)
120+
finally:
121+
del tables
122+
del hdf
123+
p.close()
124+
p.join()
118125

119-
try:
120-
p = Pool(processes)
121-
p.map(_copy_worker, tables, chunksize=1)
122-
finally:
123-
del tables
124-
del hdf
125-
p.close()
126-
p.join()
126+
else:
127+
raise ValueError("processes should be int or None.")

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.