Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 4bc9bcb

Browse filesBrowse files
committed
plug-and-play functions to copy HDF files to PostgreSQL
1 parent 4c41d08 commit 4bc9bcb
Copy full SHA for 4bc9bcb

File tree

Expand file treeCollapse file tree

3 files changed

+99
-1
lines changed
Filter options
Expand file treeCollapse file tree

3 files changed

+99
-1
lines changed

‎pandas_to_postgres/__init__.py

Copy file name to clipboardExpand all lines: pandas_to_postgres/__init__.py
+5Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
from .copy_df import DataFrameCopy
22
from .copy_hdf import HDFTableCopy, SmallHDFTableCopy, BigHDFTableCopy
3+
from .hdf_to_postgres import (
4+
hdf_to_postgres,
5+
multiprocess_hdf_to_postgres,
6+
create_hdf_table_objects,
7+
)
38
from .utilities import (
49
logger,
510
HDFMetadata,

‎pandas_to_postgres/hdf_to_postgres.py

Copy file name to clipboard
+92Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
from typing import List
2+
from multiprocessing import Pool
3+
import SQLAlchemy
4+
from pandas_to_postgres import HDFTableCopy, HDFMetadata
5+
6+
7+
def create_hdf_table_objects(hdf_meta, csv_chunksize=10 ** 6):
8+
tables = []
9+
10+
for sql_table, hdf_tables in hdf_meta.sql_to_hdf.items():
11+
tables.append(
12+
HDFTableCopy(
13+
hdf_tables,
14+
hdf_meta,
15+
defer_sql_objs=True,
16+
sql_table=sql_table,
17+
csv_chunksize=csv_chunksize,
18+
)
19+
)
20+
21+
return tables
22+
23+
24+
def copy_worker(db: SQLAlchemy, copy_obj: HDFTableCopy, defer_sql_objs: bool = True):
25+
db.engine.dispose()
26+
with db.engine.connect() as conn:
27+
conn.execution_options(autocommit=True)
28+
conn.execute("SET maintenance_work_mem TO 1000000;")
29+
30+
if defer_sql_objs:
31+
table_obj = db.metadata.tables[copy_obj.sql_table]
32+
copy_obj.instantiate_sql_objs(conn, table_obj)
33+
34+
copy_obj.copy()
35+
36+
37+
def hdf_to_postgres(
38+
file_name: str, db: SQLAlchemy, keys: List[str] = [], csv_chunksize: int = 10 ** 6
39+
):
40+
"""
41+
Copy tables in a HDF file to PostgreSQL database
42+
43+
Parameters
44+
----------
45+
file_name: name of file or path to file of HDF to use to copy
46+
db: SQLAlchemy object of destination database
47+
keys: list of HDF keys to copy
48+
csv_chunksize: maximum number of StringIO CSV rows to keep in memory at a time
49+
"""
50+
hdf = HDFMetadata(
51+
file_name, keys, metadata_attr="atlas_metadata", metadata_keys=["levels"]
52+
)
53+
54+
tables = create_hdf_table_objects(hdf, csv_chunksize=csv_chunksize)
55+
56+
for table in tables:
57+
copy_worker(table, defer_sql_objs=True)
58+
59+
60+
def multiprocess_hdf_to_postgres(
61+
file_name: str,
62+
db: SQLAlchemy,
63+
keys: List[str] = [],
64+
processes: int = 4,
65+
csv_chunksize: int = 10 ** 6,
66+
):
67+
"""
68+
Copy tables in a HDF file to PostgreSQL database using a multiprocessing Pool
69+
70+
Parameters
71+
----------
72+
file_name: name of file or path to file of HDF to use to copy
73+
db: SQLAlchemy object of destination database
74+
keys: list of HDF keys to copy
75+
processes: number of processes in the Pool
76+
csv_chunksize: maximum number of StringIO CSV rows to keep in memory at a time
77+
"""
78+
79+
hdf = HDFMetadata(
80+
file_name, keys, metadata_attr="atlas_metadata", metadata_keys=["levels"]
81+
)
82+
83+
tables = create_hdf_table_objects(hdf, csv_chunksize=csv_chunksize)
84+
85+
try:
86+
p = Pool(processes)
87+
p.map(copy_worker, tables, chunksize=1)
88+
finally:
89+
del tables
90+
del hdf
91+
p.close()
92+
p.join()

‎pandas_to_postgres/utilities.py

Copy file name to clipboardExpand all lines: pandas_to_postgres/utilities.py
+2-1Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ def __init__(
3434
----------
3535
file_name: path to hdf file to copy from
3636
keys: list of hdf keys to copy data from
37-
chunksize: maximum rows read from an hdf file into a pandas dataframe
37+
chunksize: maximum rows read from an hdf file into a pandas dataframe when using
38+
the BigTable protocol
3839
metadata_attr: location of relevant metadata in store.get_storer().attrs
3940
metadata_keys: list of keys to get from metadata store
4041
"""

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.