Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit c082a75

Browse filesBrowse files
authored
Merge pull request cid-harvard#4 from cid-harvard/feature/hdf-to-postgres-refactor
Feature/hdf to postgres refactor
2 parents 74edfb7 + 48d735f commit c082a75
Copy full SHA for c082a75

File tree

Expand file treeCollapse file tree

2 files changed

+64
-57
lines changed
Filter options
Expand file treeCollapse file tree

2 files changed

+64
-57
lines changed

‎pandas_to_postgres/__init__.py

Copy file name to clipboardExpand all lines: pandas_to_postgres/__init__.py
-1Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
from .copy_hdf import HDFTableCopy, SmallHDFTableCopy, BigHDFTableCopy
33
from .hdf_to_postgres import (
44
hdf_to_postgres,
5-
multiprocess_hdf_to_postgres,
65
create_hdf_table_objects,
76
)
87
from .utilities import (

‎pandas_to_postgres/hdf_to_postgres.py

Copy file name to clipboard
+64-56Lines changed: 64 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
from multiprocessing import Pool
2-
from .copy_hdf import HDFTableCopy, HDFMetadata
2+
3+
from sqlalchemy import MetaData, create_engine
4+
5+
from .copy_hdf import HDFTableCopy
6+
from .utilities import HDFMetadata
37

48

59
def create_hdf_table_objects(hdf_meta, csv_chunksize=10 ** 6):
@@ -34,91 +38,95 @@ def create_hdf_table_objects(hdf_meta, csv_chunksize=10 ** 6):
3438
return tables
3539

3640

37-
def _copy_worker(copy_obj, defer_sql_objs=True):
38-
"""
39-
Handle a SQLAlchemy connection and copy using HDFTableCopy object
41+
def _copy_worker(copy_obj, engine_args, engine_kwargs, maintenance_work_mem="1G"):
42+
43+
# Since we fork()ed into a new process, the engine contains process
44+
# specific stuff that shouldn't be shared - this creates a fresh Engine
45+
# with the same settings but without those.
46+
47+
engine = create_engine(*engine_args, **engine_kwargs)
48+
metadata = MetaData(bind=engine)
49+
metadata.reflect()
50+
51+
with engine.connect() as conn:
4052

41-
copy_obj: HDFTableCopy or subclass
42-
Object to use to run the copy() method on
43-
defer_sql_objs: bool
44-
If True, SQL objects were not build upon instantiation of copy_obj and should
45-
be built before copying data to db (needed for multiprocessing)
46-
"""
47-
database.engine.dispose()
48-
with database.engine.connect() as conn:
4953
conn.execution_options(autocommit=True)
50-
conn.execute("SET maintenance_work_mem TO 1000000;")
5154

52-
if defer_sql_objs:
53-
table_obj = database.metadata.tables[copy_obj.sql_table]
54-
copy_obj.instantiate_sql_objs(conn, table_obj)
55+
if maintenance_work_mem is not None:
56+
conn.execute("SET maintenance_work_mem TO {};".format(maintenance_work_mem))
57+
58+
# Get SQLAlchemy Table object
59+
table_obj = metadata.tables.get(copy_obj.sql_table, None)
60+
if table_obj is None:
61+
raise ValueError("Table {} does not exist.".format(copy_obj.sql_table))
62+
63+
copy_obj.instantiate_sql_objs(conn, table_obj)
5564

65+
# Run the task
5666
copy_obj.copy()
5767

5868

59-
def hdf_to_postgres(file_name, db, keys=[], csv_chunksize=10 ** 6):
69+
def hdf_to_postgres(file_name, engine_args, engine_kwargs={}, keys=[],
70+
csv_chunksize=10 ** 6, processes=None,
71+
maintenance_work_mem=None):
6072
"""
6173
Copy tables in a HDF file to PostgreSQL database
6274
6375
Parameters
6476
----------
6577
file_name: str
6678
name of file or path to file of HDF to use to copy
67-
db: SQLAlchemy database object
68-
destination database
79+
engine_args: list
80+
arguments to pass into create_engine()
81+
engine_kwargs: dict
82+
keyword arguments to pass into create_engine()
6983
keys: list of strings
7084
HDF keys to copy
7185
csv_chunksize: int
7286
Maximum number of StringIO CSV rows to keep in memory at a time
87+
processes: int or None
88+
If None, run single threaded. If integer, number of processes in the
89+
multiprocessing Pool
90+
maintenance_work_mem: str or None
91+
What to set postgresql's maintenance_work_mem option to: this helps
92+
when rebuilding large indexes, etc.
7393
"""
7494

75-
global database
76-
database = db
77-
7895
hdf = HDFMetadata(
7996
file_name, keys, metadata_attr="atlas_metadata", metadata_keys=["levels"]
8097
)
8198

8299
tables = create_hdf_table_objects(hdf, csv_chunksize=csv_chunksize)
83100

84-
for table in tables:
85-
_copy_worker(table, defer_sql_objs=True)
101+
if processes is None:
86102

103+
# Single-threaded run
104+
for table in tables:
105+
_copy_worker(table, engine_args, engine_kwargs, maintenance_work_mem)
87106

88-
def multiprocess_hdf_to_postgres(
89-
file_name, db, keys=[], processes=4, csv_chunksize=10 ** 6
90-
):
91-
"""
92-
Copy tables in a HDF file to PostgreSQL database using a multiprocessing Pool
107+
elif type(processes) is int:
93108

94-
Parameters
95-
----------
96-
file_name: str
97-
Name of file or path to file of HDF to use to copy
98-
db: SQLAlchemy object
99-
Destination database
100-
keys: list of strings
101-
HDF keys to copy
102-
processes: int
103-
Number of processes in the Pool
104-
csv_chunksize: int
105-
Maximum number of StringIO CSV rows to keep in memory at a time
106-
"""
109+
args = zip(
110+
tables,
111+
[engine_args] * len(tables),
112+
[engine_kwargs] * len(tables),
113+
[maintenance_work_mem] * len(tables)
114+
)
107115

108-
global database
109-
database = db
116+
try:
117+
p = Pool(processes)
118+
result = p.starmap_async(_copy_worker, args, chunksize=1)
110119

111-
hdf = HDFMetadata(
112-
file_name, keys, metadata_attr="atlas_metadata", metadata_keys=["levels"]
113-
)
120+
finally:
121+
del tables
122+
del hdf
123+
p.close()
124+
p.join()
114125

115-
tables = create_hdf_table_objects(hdf, csv_chunksize=csv_chunksize)
126+
if not result.successful():
127+
# If there's an exception, throw it, but we don't care about the
128+
# results
129+
result.get()
116130

117-
try:
118-
p = Pool(processes)
119-
p.map(_copy_worker, tables, chunksize=1)
120-
finally:
121-
del tables
122-
del hdf
123-
p.close()
124-
p.join()
131+
else:
132+
raise ValueError("processes should be int or None.")

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.