1
1
from multiprocessing import Pool
2
2
3
+ from sqlalchemy import MetaData , create_engine
4
+
3
5
from .copy_hdf import HDFTableCopy
4
6
from .utilities import HDFMetadata
5
7
@@ -36,91 +38,90 @@ def create_hdf_table_objects(hdf_meta, csv_chunksize=10 ** 6):
36
38
return tables
37
39
38
40
39
- def _copy_worker (copy_obj , defer_sql_objs = True ):
40
- """
41
- Handle a SQLAlchemy connection and copy using HDFTableCopy object
41
+ def _copy_worker (copy_obj , engine_args , engine_kwargs , maintenance_work_mem = "1G" ):
42
+
43
+ # Since we fork()ed into a new process, the engine contains process
44
+ # specific stuff that shouldn't be shared - this creates a fresh Engine
45
+ # with the same settings but without those.
46
+
47
+ engine = create_engine (* engine_args , ** engine_kwargs )
48
+ metadata = MetaData (bind = engine )
49
+ metadata .reflect ()
50
+
51
+ with engine .connect () as conn :
42
52
43
- copy_obj: HDFTableCopy or subclass
44
- Object to use to run the copy() method on
45
- defer_sql_objs: bool
46
- If True, SQL objects were not build upon instantiation of copy_obj and should
47
- be built before copying data to db (needed for multiprocessing)
48
- """
49
- database .engine .dispose ()
50
- with database .engine .connect () as conn :
51
53
conn .execution_options (autocommit = True )
52
- conn .execute ("SET maintenance_work_mem TO 1000000;" )
53
54
54
- if defer_sql_objs :
55
- table_obj = database .metadata .tables [copy_obj .sql_table ]
56
- copy_obj .instantiate_sql_objs (conn , table_obj )
55
+ if maintenance_work_mem is not None :
56
+ conn .execute ("SET maintenance_work_mem TO {};" .format (maintenance_work_mem ))
57
57
58
+ # Get SQLAlchemy Table object
59
+ table_obj = metadata .tables .get (copy_obj .sql_table , None )
60
+ if table_obj is None :
61
+ raise ValueError ("Table {} does not exist." .format (copy_obj .sql_table ))
62
+
63
+ copy_obj .instantiate_sql_objs (conn , table_obj )
64
+
65
+ # Run the task
58
66
copy_obj .copy ()
59
67
60
68
61
- def hdf_to_postgres (file_name , db , keys = [], csv_chunksize = 10 ** 6 ):
69
+ def hdf_to_postgres (file_name , engine_args , engine_kwargs = {}, keys = [],
70
+ csv_chunksize = 10 ** 6 , processes = None ,
71
+ maintenance_work_mem = None ):
62
72
"""
63
73
Copy tables in a HDF file to PostgreSQL database
64
74
65
75
Parameters
66
76
----------
67
77
file_name: str
68
78
name of file or path to file of HDF to use to copy
69
- db: SQLAlchemy database object
70
- destination database
79
+ engine_args: list
80
+ arguments to pass into create_engine()
81
+ engine_kwargs: dict
82
+ keyword arguments to pass into create_engine()
71
83
keys: list of strings
72
84
HDF keys to copy
73
85
csv_chunksize: int
74
86
Maximum number of StringIO CSV rows to keep in memory at a time
87
+ processes: int or None
88
+ If None, run single threaded. If integer, number of processes in the
89
+ multiprocessing Pool
90
+ maintenance_work_mem: str or None
91
+ What to set postgresql's maintenance_work_mem option to: this helps
92
+ when rebuilding large indexes, etc.
75
93
"""
76
94
77
- global database
78
- database = db
79
-
80
95
hdf = HDFMetadata (
81
96
file_name , keys , metadata_attr = "atlas_metadata" , metadata_keys = ["levels" ]
82
97
)
83
98
84
99
tables = create_hdf_table_objects (hdf , csv_chunksize = csv_chunksize )
85
100
86
- for table in tables :
87
- _copy_worker (table , defer_sql_objs = True )
101
+ if processes is None :
88
102
103
+ # Single-threaded run
104
+ for table in tables :
105
+ _copy_worker (table , engine_args , engine_kwargs , maintenance_work_mem )
89
106
90
- def multiprocess_hdf_to_postgres (
91
- file_name , db , keys = [], processes = 4 , csv_chunksize = 10 ** 6
92
- ):
93
- """
94
- Copy tables in a HDF file to PostgreSQL database using a multiprocessing Pool
95
-
96
- Parameters
97
- ----------
98
- file_name: str
99
- Name of file or path to file of HDF to use to copy
100
- db: SQLAlchemy object
101
- Destination database
102
- keys: list of strings
103
- HDF keys to copy
104
- processes: int
105
- Number of processes in the Pool
106
- csv_chunksize: int
107
- Maximum number of StringIO CSV rows to keep in memory at a time
108
- """
107
+ elif type (processes ) is int :
109
108
110
- global database
111
- database = db
109
+ args = zip (
110
+ tables ,
111
+ [engine_args ] * len (tables ),
112
+ [engine_kwargs ] * len (tables ),
113
+ [maintenance_work_mem ] * len (tables )
114
+ )
112
115
113
- hdf = HDFMetadata (
114
- file_name , keys , metadata_attr = "atlas_metadata" , metadata_keys = [ "levels" ]
115
- )
116
+ try :
117
+ p = Pool ( processes )
118
+ p . starmap ( _copy_worker , args , chunksize = 1 )
116
119
117
- tables = create_hdf_table_objects (hdf , csv_chunksize = csv_chunksize )
120
+ finally :
121
+ del tables
122
+ del hdf
123
+ p .close ()
124
+ p .join ()
118
125
119
- try :
120
- p = Pool (processes )
121
- p .map (_copy_worker , tables , chunksize = 1 )
122
- finally :
123
- del tables
124
- del hdf
125
- p .close ()
126
- p .join ()
126
+ else :
127
+ raise ValueError ("processes should be int or None." )
0 commit comments