1
1
from multiprocessing import Pool
2
- from .copy_hdf import HDFTableCopy , HDFMetadata
2
+
3
+ from sqlalchemy import MetaData , create_engine
4
+
5
+ from .copy_hdf import HDFTableCopy
6
+ from .utilities import HDFMetadata
3
7
4
8
5
9
def create_hdf_table_objects (hdf_meta , csv_chunksize = 10 ** 6 ):
@@ -34,91 +38,95 @@ def create_hdf_table_objects(hdf_meta, csv_chunksize=10 ** 6):
34
38
return tables
35
39
36
40
37
- def _copy_worker (copy_obj , defer_sql_objs = True ):
38
- """
39
- Handle a SQLAlchemy connection and copy using HDFTableCopy object
41
+ def _copy_worker (copy_obj , engine_args , engine_kwargs , maintenance_work_mem = "1G" ):
42
+
43
+ # Since we fork()ed into a new process, the engine contains process
44
+ # specific stuff that shouldn't be shared - this creates a fresh Engine
45
+ # with the same settings but without those.
46
+
47
+ engine = create_engine (* engine_args , ** engine_kwargs )
48
+ metadata = MetaData (bind = engine )
49
+ metadata .reflect ()
50
+
51
+ with engine .connect () as conn :
40
52
41
- copy_obj: HDFTableCopy or subclass
42
- Object to use to run the copy() method on
43
- defer_sql_objs: bool
44
- If True, SQL objects were not build upon instantiation of copy_obj and should
45
- be built before copying data to db (needed for multiprocessing)
46
- """
47
- database .engine .dispose ()
48
- with database .engine .connect () as conn :
49
53
conn .execution_options (autocommit = True )
50
- conn .execute ("SET maintenance_work_mem TO 1000000;" )
51
54
52
- if defer_sql_objs :
53
- table_obj = database .metadata .tables [copy_obj .sql_table ]
54
- copy_obj .instantiate_sql_objs (conn , table_obj )
55
+ if maintenance_work_mem is not None :
56
+ conn .execute ("SET maintenance_work_mem TO {};" .format (maintenance_work_mem ))
57
+
58
+ # Get SQLAlchemy Table object
59
+ table_obj = metadata .tables .get (copy_obj .sql_table , None )
60
+ if table_obj is None :
61
+ raise ValueError ("Table {} does not exist." .format (copy_obj .sql_table ))
62
+
63
+ copy_obj .instantiate_sql_objs (conn , table_obj )
55
64
65
+ # Run the task
56
66
copy_obj .copy ()
57
67
58
68
59
- def hdf_to_postgres (file_name , db , keys = [], csv_chunksize = 10 ** 6 ):
69
+ def hdf_to_postgres (file_name , engine_args , engine_kwargs = {}, keys = [],
70
+ csv_chunksize = 10 ** 6 , processes = None ,
71
+ maintenance_work_mem = None ):
60
72
"""
61
73
Copy tables in a HDF file to PostgreSQL database
62
74
63
75
Parameters
64
76
----------
65
77
file_name: str
66
78
name of file or path to file of HDF to use to copy
67
- db: SQLAlchemy database object
68
- destination database
79
+ engine_args: list
80
+ arguments to pass into create_engine()
81
+ engine_kwargs: dict
82
+ keyword arguments to pass into create_engine()
69
83
keys: list of strings
70
84
HDF keys to copy
71
85
csv_chunksize: int
72
86
Maximum number of StringIO CSV rows to keep in memory at a time
87
+ processes: int or None
88
+ If None, run single threaded. If integer, number of processes in the
89
+ multiprocessing Pool
90
+ maintenance_work_mem: str or None
91
+ What to set postgresql's maintenance_work_mem option to: this helps
92
+ when rebuilding large indexes, etc.
73
93
"""
74
94
75
- global database
76
- database = db
77
-
78
95
hdf = HDFMetadata (
79
96
file_name , keys , metadata_attr = "atlas_metadata" , metadata_keys = ["levels" ]
80
97
)
81
98
82
99
tables = create_hdf_table_objects (hdf , csv_chunksize = csv_chunksize )
83
100
84
- for table in tables :
85
- _copy_worker (table , defer_sql_objs = True )
101
+ if processes is None :
86
102
103
+ # Single-threaded run
104
+ for table in tables :
105
+ _copy_worker (table , engine_args , engine_kwargs , maintenance_work_mem )
87
106
88
- def multiprocess_hdf_to_postgres (
89
- file_name , db , keys = [], processes = 4 , csv_chunksize = 10 ** 6
90
- ):
91
- """
92
- Copy tables in a HDF file to PostgreSQL database using a multiprocessing Pool
107
+ elif type (processes ) is int :
93
108
94
- Parameters
95
- ----------
96
- file_name: str
97
- Name of file or path to file of HDF to use to copy
98
- db: SQLAlchemy object
99
- Destination database
100
- keys: list of strings
101
- HDF keys to copy
102
- processes: int
103
- Number of processes in the Pool
104
- csv_chunksize: int
105
- Maximum number of StringIO CSV rows to keep in memory at a time
106
- """
109
+ args = zip (
110
+ tables ,
111
+ [engine_args ] * len (tables ),
112
+ [engine_kwargs ] * len (tables ),
113
+ [maintenance_work_mem ] * len (tables )
114
+ )
107
115
108
- global database
109
- database = db
116
+ try :
117
+ p = Pool (processes )
118
+ result = p .starmap_async (_copy_worker , args , chunksize = 1 )
110
119
111
- hdf = HDFMetadata (
112
- file_name , keys , metadata_attr = "atlas_metadata" , metadata_keys = ["levels" ]
113
- )
120
+ finally :
121
+ del tables
122
+ del hdf
123
+ p .close ()
124
+ p .join ()
114
125
115
- tables = create_hdf_table_objects (hdf , csv_chunksize = csv_chunksize )
126
+ if not result .successful ():
127
+ # If there's an exception, throw it, but we don't care about the
128
+ # results
129
+ result .get ()
116
130
117
- try :
118
- p = Pool (processes )
119
- p .map (_copy_worker , tables , chunksize = 1 )
120
- finally :
121
- del tables
122
- del hdf
123
- p .close ()
124
- p .join ()
131
+ else :
132
+ raise ValueError ("processes should be int or None." )
0 commit comments