diff --git a/ChangeLog b/ChangeLog index de2563c..393bd11 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,24 @@ +* 4.0.1 Jul 23 2019 + +- Update testWrite.py to be compatible with windows, add "--help" option and usage, validate when arguments are provided + +- Regenerate and move experimental patch into "experimental" directory (not ready for production use, not included in pypi package) + +* 4.0.0 Jan 6 2017 +- Update testWrite.py program for better testing of I/O priority levels in +BackgroundWrite given your default load +- Fixup equations in BackgroundWrite to better guarentee a given priority +(bandwidth percent) +- Fine-Tune default I/O priority levels in BackgroundWrite to ensure a higher +overall performance, a more constant difference between other task priorities +and this task priority, less of a difference in default block size, etc. +- Minor optimizations in BackgroundWrite +- BACKWORDS-INCOMPATIBLE: BackgroundIOPriority now has a different constructor, so if you specified +custom priorities (instead of passing ioprio=1-9) these will need to be +adjusted. +- Make ioprio=1 not sleep at all, i.e. maximum available I/O in background + + * 3.0.0 Feb 5 2016 - BUGFIX: Fix typo in MANIFEST.in - FEATURE: Update nonblock_read so it works with socket-like objects as well as file-like diff --git a/doc/.gitignore b/doc/.gitignore new file mode 100644 index 0000000..c4c4ffc --- /dev/null +++ b/doc/.gitignore @@ -0,0 +1 @@ +*.zip diff --git a/doc/index.html b/doc/index.html new file mode 120000 index 0000000..a08165e --- /dev/null +++ b/doc/index.html @@ -0,0 +1 @@ +nonblock.html \ No newline at end of file diff --git a/doc/nonblock.BackgroundRead.html b/doc/nonblock.BackgroundRead.html index 5c5c7dd..fa6fd33 100644 --- a/doc/nonblock.BackgroundRead.html +++ b/doc/nonblock.BackgroundRead.html @@ -1,118 +1,118 @@ -Python: module nonblock.BackgroundRead - - +Python: module nonblock.BackgroundRead + + - - -
 
- 
nonblock.BackgroundRead
index
-

Copyright (c) 2015-2016 Timothy Savannah under terms of LGPLv2. You should have received a copy of this LICENSE with this distribution.

+ + +
 
+ 
nonblock.BackgroundRead
index
+

Copyright (c) 2015-2016 Timothy Savannah under terms of LGPLv2. You should have received a copy of this LICENSE with this distribution.

read.py Contains pure-python functions for non-blocking reads in python

-

- - - +

+

 
-Modules
+ + - -
 
+Modules
       
threading
-
time
-

- - - + +
 
-Classes
       
threading
+
time
+

+ + + - - +
 
+Classes
       
-
builtins.object -
-
-
BackgroundReadData +
       
+
builtins.object +
+
+
BackgroundReadData
-

- - - +

+

 
-class BackgroundReadData(builtins.object)
+ + - - - - + + +
 
+class BackgroundReadData(builtins.object)
   BackgroundReadData - An object returned by the bgread function. This object is automatically populated in the background by a thread with data read off the stream.

-It contains the following attributes:

-    blocks - The raw non-zero length blocks read from the stream, in order.

-    data - A calculated property, which is a bytes/str (depending on stream mode). It is the joining of all the read blocks, and contains all the data read to-date.

-    isFinished - starts False, and becomes True after all data has been read from the stream. Will remain False if there is an exception raised during I/O
-    
-    error - starts None, and is set to any exception that is raised during reading (which will also terminate the thread)
 
 Methods defined here:
-
__init__(self, dataType)
+
   BackgroundReadData(dataType)

+BackgroundReadData - An object returned by the bgread function. This object is automatically populated in the background by a thread with data read off the stream.

+It contains the following attributes:

+    blocks - The raw non-zero length blocks read from the stream, in order.

+    data - A calculated property, which is a bytes/str (depending on stream mode). It is the joining of all the read blocks, and contains all the data read to-date.

+    isFinished - starts False, and becomes True after all data has been read from the stream. Will remain False if there is an exception raised during I/O

+    error - starts None, and is set to any exception that is raised during reading (which will also terminate the thread)
 
 Methods defined here:
+
__init__(self, dataType)
Initialize self.  See help(type(self)) for accurate signature.
-
addBlock(self, block)
+
addBlock(self, block)
-
-Data descriptors defined here:
-
__dict__
-
dictionary for instance variables (if defined)
+
+Data descriptors defined here:
+
__dict__
+
dictionary for instance variables (if defined)
-
__weakref__
-
list of weak references to the object (if defined)
+
__weakref__
+
list of weak references to the object (if defined)
-
data
-
data - property to get the data as a string or bytes.
-    Use "blocks" to access the individual blocks of data

+
data
+
data - property to get the data as a string or bytes.
+    Use "blocks" to access the individual blocks of data

@return <str or bytes> - All data currently read, as a string or bytes (depending on the dataType)
-

- - - +
 
-Functions

+ + + - -
 
+Functions
       
bgread(stream, blockSizeLimit=65535, pollTime=0.03, closeStream=True)
bgread - Start a thread which will read from the given stream in a non-blocking fashion, and automatically populate data in the returned object.

-    @param stream <object> - A stream on which to read. Socket, file, etc.

-    @param blockSizeLimit <None/int> - Number of bytes. Default 65535.

-        If None, the stream will be read from until there is no more available data (not closed, but you've read all that's been flushed to straem). This is okay for smaller datasets, but this number effectively controls the amount of CPU time spent in I/O on this stream VS everything else in your application. The default of 65535 bytes is a fair amount of data.

-    @param pollTime <float> - Default .03 (30ms) After all available data has been read from the stream, wait this many seconds before checking again for more data.
-        
-        A low number here means a high priority, i.e. more cycles will be devoted to checking and collecting the background data. Since this is a non-blocking read, this value is the "block", which will return execution context to the remainder of the application. The default of 100ms should be fine in most cases. If it's really idle data collection, you may want to try a value of 1 second.

-    @param closeStream <bool> - Default True. If True, the "close" method on the stream object will be called when the other side has closed and all data has been read.



-NOTES --

-        blockSizeLimit / pollTime is your effective max-throughput. Real throughput will be lower than this number, as the actual throughput is be defined by:

-        T = (blockSizeLimit / pollTime) - DeviceReadTime(blockSizeLimit)

-    Using the defaults of .03 and 65535 means you'll read up to 2 MB per second. Keep in mind that the more time spent in I/O means less time spent doing other tasks.


-    @return - The return of this function is a BackgroundReadData object. This object contains an attribute "blocks" which is a list of the non-zero-length blocks that were read from the stream. The object also contains a calculated property, "data", which is a string/bytes (depending on stream mode) of all the data currently read. The property "isFinished" will be set to True when the stream has been closed. The property "error" will be set to any exception that occurs during reading which will terminate the thread. @see BackgroundReadData for more info.
-

- - - + +
 
-Data
       
bgread(stream, blockSizeLimit=65535, pollTime=0.03, closeStream=True)
bgread - Start a thread which will read from the given stream in a non-blocking fashion, and automatically populate data in the returned object.

+    @param stream <object> - A stream on which to read. Socket, file, etc.

+    @param blockSizeLimit <None/int> - Number of bytes. Default 65535.

+        If None, the stream will be read from until there is no more available data (not closed, but you've read all that's been flushed to straem). This is okay for smaller datasets, but this number effectively controls the amount of CPU time spent in I/O on this stream VS everything else in your application. The default of 65535 bytes is a fair amount of data.

+    @param pollTime <float> - Default .03 (30ms) After all available data has been read from the stream, wait this many seconds before checking again for more data.

+        A low number here means a high priority, i.e. more cycles will be devoted to checking and collecting the background data. Since this is a non-blocking read, this value is the "block", which will return execution context to the remainder of the application. The default of 100ms should be fine in most cases. If it's really idle data collection, you may want to try a value of 1 second.

+    @param closeStream <bool> - Default True. If True, the "close" method on the stream object will be called when the other side has closed and all data has been read.



+NOTES --

+        blockSizeLimit / pollTime is your effective max-throughput. Real throughput will be lower than this number, as the actual throughput is be defined by:

+        T = (blockSizeLimit / pollTime) - DeviceReadTime(blockSizeLimit)

+    Using the defaults of .03 and 65535 means you'll read up to 2 MB per second. Keep in mind that the more time spent in I/O means less time spent doing other tasks.


+    @return - The return of this function is a BackgroundReadData object. This object contains an attribute "blocks" which is a list of the non-zero-length blocks that were read from the stream. The object also contains a calculated property, "data", which is a string/bytes (depending on stream mode) of all the data currently read. The property "isFinished" will be set to True when the stream has been closed. The property "error" will be set to any exception that occurs during reading which will terminate the thread. @see BackgroundReadData for more info.
+

+ + + - -
 
+Data
       __all__ = ('BackgroundReadData', 'bgread')
- +        +__all__ = ('BackgroundReadData', 'bgread') +

\ No newline at end of file diff --git a/doc/nonblock.BackgroundWrite.html b/doc/nonblock.BackgroundWrite.html index 28e6a34..611d92b 100644 --- a/doc/nonblock.BackgroundWrite.html +++ b/doc/nonblock.BackgroundWrite.html @@ -1,192 +1,275 @@ -Python: module nonblock.BackgroundWrite - - +Python: module nonblock.BackgroundWrite + + - - -
 
- 
nonblock.BackgroundWrite
index
-

Copyright (c) 2015 Timothy Savannah under terms of LGPLv2. You should have received a copy of this LICENSE with this distribution.

+ + +
 
+ 
nonblock.BackgroundWrite
index
+

Copyright (c) 2015, 2016, 2017 Timothy Savannah under terms of LGPLv2. You should have received a copy of this LICENSE with this distribution.

BackgroundWrite.py Contains pure-python functions for non-blocking background writing (writing multiple streams at once; interactive writing allowing a high amount of CPU time for calculations/other tasks

-

- - - +

+

 
-Modules
+ + - -
 
+Modules
       
threading
-
time
-

- - - + +
 
-Classes
       
threading
+
time
+

+ + + - - +
 
+Classes
       
-
builtins.object -
-
-
BackgroundIOPriority +
       
+
builtins.object +
+
+
BackgroundIOPriority
-
threading.Thread(builtins.object) -
-
-
BackgroundWriteProcess +
threading.Thread(builtins.object) +
+
+
BackgroundWriteProcess
-

- - - +

+

 
-class BackgroundIOPriority(builtins.object)
+ + - - - - + + +
 
+class BackgroundIOPriority(builtins.object)
   BackgroundIOPriority - Priority Profile for doing background writes.

-    See __init__ for fields
 
 Methods defined here:
-
__getitem__(self, key)
+
   BackgroundIOPriority(chainPollTime, defaultChunkSize, bandwidthPct, numChunksRateSmoothing=5)

+BackgroundIOPriority - Priority Profile for doing background writes.

+    See __init__ for fields
 
 Methods defined here:
+
__getitem__(self, key)
-
__init__(self, chainPollTime, defaultChunkSize, priorityPct, charityRate=1.85, charityTime=0.0003)
__init__ - Create a BackgroundIOPriority

-Some terms: throughput - Bandwidth out (Megs per second)
-            interactivity - CPU time available for other tasks (calculations, other I/O, etc)

-@param chainPollTime - float > 0, When chaining, this is the sleep time between checking if prior is finished.
-    Too low and the polling takes up CPU time, too high and you'll lose a little time in between chained writes, while gaining interactivity elsewhere.

-@param defaultChunkSize - integer > 0, When providing a straight string/bytes to bgwrite (instead of chunking yourself, or using bgwrite_chunk) this will
-    be used as the max size of each chunk. Each chunk is written and a flush is issued (if the stream supports it).
-    Increasing this increases throughput while decreasing interactivity

-@param priorityPct - integer > 0, generally 0-100. When this number is high, throughput for the operation will be higher. When it is lower,
-   interactivity is higher, e.x. if you have a calculation going and a write going, the lower this number the longer the write will take, but the more
-   calculations will be performed during that period.

-@param charityRate - float >= 0, Every couple of blocks written, the current throughput is checked and if things have been going swiftly
-   a short sleep will be incurred. Increasing this number causes that check to happen more often.

-   This number is related to both the number of blocks and the priorityPct. The default, should be fine, but you may find it better
-   as a different value in certain cases. Increasing or decreasing could either increase or decrease interactivity, depending on those other factors.
-   Generally, however, increasing this increases interactivity and ability to write in parallel, at the cost of throughput.

-@param charityTime - float >= 0 - Used to calculate the time to sleep when the charity period hits. The equation is:
-    sleepTime = charityTime * ((dataWritten / delta) / ( (dataWritten / delta) * priorityPctDec))
-     Where dataWritten = number of bytes written already, delta = total time spent writing (not including charity time sleeping)
-     and priorityPctDec = priorityPct / 100.

-     Increasing this can increase interactivity and allow more parallel operations at the cost of throughput.
-     The default should be fine for the majority of cases, but it is tunable.

+
__init__(self, chainPollTime, defaultChunkSize, bandwidthPct, numChunksRateSmoothing=5)
__init__ - Create a BackgroundIOPriority.

+Some terms: throughput - Bandwidth out (Megs per second)
+            interactivity - CPU time available for other tasks (calculations, other I/O, etc)

+@param chainPollTime - float > 0, When chaining, this is the sleep time between checking if prior is finished.
+    Too low and the polling takes up CPU time, too high and you'll lose a little time in between chained writes, while gaining interactivity elsewhere.

+@param defaultChunkSize - integer > 0, When providing a straight string/bytes to bgwrite (instead of chunking yourself, or using bgwrite_chunk) this will
+    be used as the max size of each chunk. Each chunk is written and a flush is issued (if the stream supports it).
+    Increasing this increases throughput while decreasing interactivity

+@param bandwidthPct - integer > 0 and < 100. This is the percentage of overall bandwidth that this task will attempt to use.

+  A high number means higher throughput at the cost of lest interactivity for other tasks, a low number means the opposite.

+  So, for example, a bandwidthPct of "50" will attempt to use "50%" of the available bandwidth. Note, this does not represent theroetical
+  max bandwidth, i.e. the max rate of the I/O device, but the amount of available bandwidth available to this application. For example,
+  if this is given "100%", no throttling is performed. If this is given "80%", then it calculates the average time to write a single chunk,
+  ( see #numChunksRateSmoothing for how many chunks are used in evaluating this average ), and sleeps for then 20% of that time at the end
+  of every chunk.

+@param numChunksRateSmoothing - integer >= 1 , Default 5. This is the number of chunks which are used in calculating the current throughput rate.
+  See #bandwidthPct for the other half of the story. The higher this number, the more "fair" your application will be against a constant
+  rate of I/O by other applications, but the less able it may be to play fair when the external I/O is spiking.

+  Also, consider that this is related to the #defaultChunkSize, as it is not a constant period of time. The default of "5" should be okay,
+  but you may want to tune it if you use really large or really small chunk sizes.


An "interactivity score" is defined to be (number of calculations) / (time to write data).
-
__setitem__(self, key, value)
+
__setitem__(self, key, value)
-
-Data descriptors defined here:
-
chainPollTime
+
+Data descriptors defined here:
+
bandwidthPct
-
charityRate
+
chainPollTime
-
charityTime
+
defaultChunkSize
-
defaultChunkSize
+
numChunksRateSmoothing
-
priorityPct
-
-

- - - +
 
-class BackgroundWriteProcess(threading.Thread)

+ + + - - - - + + +
 
+class BackgroundWriteProcess(threading.Thread)
   BackgroundWriteProcess - A thread and data store representing a background write task. You should probably use one of the bgwrite* methods and not this directly.

-Attributes:

-    remainingData  <deque> - A queue representing the data yet to be written

-    startedWriting <bool>  - Starts False, changes to True when writing has started (thread has started and any pending prior chain has completed)

-    finished    <bool>   - Starts False, changes to True after writing has completed, and if closeWhenFinished is True the handle is also closed.
 
 
Method resolution order:
-
BackgroundWriteProcess
-
threading.Thread
-
builtins.object
+
   BackgroundWriteProcess(fileObj, dataBlocks, closeWhenFinished=False, chainAfter=None, ioPrio=4)

+BackgroundWriteProcess - A thread and data store representing a background write task. You should probably use one of the bgwrite* methods and not this directly.

+Attributes:

+    remainingData  <deque> - A queue representing the data yet to be written

+    startedWriting <bool>  - Starts False, changes to True when writing has started (thread has started and any pending prior chain has completed)

+    finished    <bool>   - Starts False, changes to True after writing has completed, and if closeWhenFinished is True the handle is also closed.
 
 
Method resolution order:
+
BackgroundWriteProcess
+
threading.Thread
+
builtins.object
-
-Methods defined here:
-
__init__(self, fileObj, dataBlocks, closeWhenFinished=False, chainAfter=None, ioPrio=4)
__init__ - Create the BackgroundWriteProcess thread. You should probably use bgwrite or bgwrite_chunk instead of calling this directly.

-@param fileObj <stream> - A stream, like a file, to write into. Hopefully it supports flushing, but it is not a requirement.

-@param dataBlocks <bytes/str/list<bytes/str>> - If a list of bytes/str, those are treated as the data blocks, written in order with heuristics for interactivity in between blocks.  If bytes/str are provided not in a list form, they will be split based on the rules of the associated #ioPrio

-@param closeWhenFinished <bool> - Default False. If True, the fileObj will be closed after writing has completed.

-@param chainAfter <None/BackgroundWriteProcess> - If provided, will hold off writing until the provided BackgroundWriteProcess has completed (used for queueing multiple writes whilst retaining order)

-@param ioPrio <int/BackgroundIOPriority> - If an integer (1-10), a predefined BackgroundIOPriority will be used. 1 is highest throughput, 10 is most interactivity. You can also pass in your own BackgroundIOPriority object if you want to define a custom profile.


-@raises ValueError - If ioPrio is neither a BackgroundIOPriority nor integer 1-10 inclusive
-                   - If chainAfter is not a BackgroundWriteProcess or None
+
+Methods defined here:
+
__init__(self, fileObj, dataBlocks, closeWhenFinished=False, chainAfter=None, ioPrio=4)
__init__ - Create the BackgroundWriteProcess thread. You should probably use bgwrite or bgwrite_chunk instead of calling this directly.

+@param fileObj <stream> - A stream, like a file, to write into. Hopefully it supports flushing, but it is not a requirement.

+@param dataBlocks <bytes/str/list<bytes/str>> - If a list of bytes/str, those are treated as the data blocks, written in order with heuristics for interactivity in between blocks.  If bytes/str are provided not in a list form, they will be split based on the rules of the associated #ioPrio

+@param closeWhenFinished <bool> - Default False. If True, the fileObj will be closed after writing has completed.

+@param chainAfter <None/BackgroundWriteProcess> - If provided, will hold off writing until the provided BackgroundWriteProcess has completed (used for queueing multiple writes whilst retaining order)

+@param ioPrio <int/BackgroundIOPriority> - If an integer (1-10), a predefined BackgroundIOPriority will be used. 1 is highest throughput, 10 is most interactivity. You can also pass in your own BackgroundIOPriority object if you want to define a custom profile.


+@raises ValueError - If ioPrio is neither a BackgroundIOPriority nor integer 1-10 inclusive
+                   - If chainAfter is not a BackgroundWriteProcess or None
+ +
run(self)
run - Starts the thread. bgwrite and bgwrite_chunk automatically start the thread.
+ +
+Methods inherited from threading.Thread:
+
__repr__(self)
Return repr(self).
+ +
getName(self)
+ +
isAlive(self)
Return whether the thread is alive.

+This method is deprecated, use is_alive() instead.
+ +
isDaemon(self)
+ +
is_alive(self)
Return whether the thread is alive.

+This method returns True just before the run() method starts until just
+after the run() method terminates. The module function enumerate()
+returns a list of all alive threads.
+ +
join(self, timeout=None)
Wait until the thread terminates.

+This blocks the calling thread until the thread whose join() method is
+called terminates -- either normally or through an unhandled exception
+or until the optional timeout occurs.

+When the timeout argument is present and not None, it should be a
+floating point number specifying a timeout for the operation in seconds
+(or fractions thereof). As join() always returns None, you must call
+is_alive() after join() to decide whether a timeout happened -- if the
+thread is still alive, the join() call timed out.

+When the timeout argument is not present or None, the operation will
+block until the thread terminates.

+A thread can be join()ed many times.

+join() raises a RuntimeError if an attempt is made to join the current
+thread as that would cause a deadlock. It is also an error to join() a
+thread before it has been started and attempts to do so raises the same
+exception.
-
run(self)
run - Starts the thread. bgwrite and bgwrite_chunk automatically start the thread.
+
setDaemon(self, daemonic)
-

- - - +
setName(self, name)
+ +
start(self)
Start the thread's activity.

+It must be called at most once per thread object. It arranges for the
+object's run() method to be invoked in a separate thread of control.

+This method will raise a RuntimeError if called more than once on the
+same thread object.
+ +
+Data descriptors inherited from threading.Thread:
+
__dict__
+
dictionary for instance variables (if defined)
+
+
__weakref__
+
list of weak references to the object (if defined)
+
+
daemon
+
A boolean value indicating whether this thread is a daemon thread.

+This must be set before start() is called, otherwise RuntimeError is
+raised. Its initial value is inherited from the creating thread; the
+main thread is not a daemon thread and therefore all threads created in
+the main thread default to daemon = False.

+The entire Python program exits when no alive non-daemon threads are
+left.
+
+
ident
+
Thread identifier of this thread or None if it has not been started.

+This is a nonzero integer. See the get_ident() function. Thread
+identifiers may be recycled when a thread exits and another thread is
+created. The identifier is available even after the thread has exited.
+
+
name
+
A string used for identification purposes only.

+It has no semantics. Multiple threads may be given the same name. The
+initial name is set by the constructor.
+
+
 
-Functions

+ + + - - +
 
+Functions
       
bgwrite(fileObj, data, closeWhenFinished=False, chainAfter=None, ioPrio=4)
bgwrite - Start a background writing process

-    @param fileObj <stream> - A stream backed by an fd

-    @param data    <str/bytes/list> - The data to write. If a list is given, each successive element will be written to the fileObj and flushed. If a string/bytes is provided, it will be chunked according to the #BackgroundIOPriority chosen. If you would like a different chunking than the chosen ioPrio provides, use #bgwrite_chunk function instead.

-       Chunking makes the data available quicker on the other side, reduces iowait on this side, and thus increases interactivity (at penalty of throughput).

-    @param closeWhenFinished <bool> - If True, the given fileObj will be closed after all the data has been written. Default False.

-    @param chainAfter  <None/BackgroundWriteProcess> - If a BackgroundWriteProcess object is provided (the return of bgwrite* functions), this data will be held for writing until the data associated with the provided object has completed writing.
-    Use this to queue several background writes, but retain order within the resulting stream.


-    @return - BackgroundWriteProcess - An object representing the state of this operation. @see BackgroundWriteProcess
-
bgwrite_chunk(fileObj, data, chunkSize, closeWhenFinished=False, chainAfter=None, ioPrio=4)
bgwrite_chunk - Chunk up the data into even #chunkSize blocks, and then pass it onto #bgwrite.
-    Use this to break up a block of data into smaller segments that can be written and flushed.
-    The smaller the chunks, the more interactive (recipient gets data quicker, iowait goes down for you) at cost of throughput.

-    bgwrite will automatically chunk according to the given ioPrio, but you can use this for finer-tuned control.

-    @see bgwrite

-@param data <string/bytes> - The data to chunk up

+
       
bgwrite(fileObj, data, closeWhenFinished=False, chainAfter=None, ioPrio=4)
bgwrite - Start a background writing process

+    @param fileObj <stream> - A stream backed by an fd

+    @param data    <str/bytes/list> - The data to write. If a list is given, each successive element will be written to the fileObj and flushed. If a string/bytes is provided, it will be chunked according to the #BackgroundIOPriority chosen. If you would like a different chunking than the chosen ioPrio provides, use #bgwrite_chunk function instead.

+       Chunking makes the data available quicker on the other side, reduces iowait on this side, and thus increases interactivity (at penalty of throughput).

+    @param closeWhenFinished <bool> - If True, the given fileObj will be closed after all the data has been written. Default False.

+    @param chainAfter  <None/BackgroundWriteProcess> - If a BackgroundWriteProcess object is provided (the return of bgwrite* functions), this data will be held for writing until the data associated with the provided object has completed writing.
+    Use this to queue several background writes, but retain order within the resulting stream.


+    @return - BackgroundWriteProcess - An object representing the state of this operation. @see BackgroundWriteProcess
+
bgwrite_chunk(fileObj, data, chunkSize, closeWhenFinished=False, chainAfter=None, ioPrio=4)
bgwrite_chunk - Chunk up the data into even #chunkSize blocks, and then pass it onto #bgwrite.
+    Use this to break up a block of data into smaller segments that can be written and flushed.
+    The smaller the chunks, the more interactive (recipient gets data quicker, iowait goes down for you) at cost of throughput.

+    bgwrite will automatically chunk according to the given ioPrio, but you can use this for finer-tuned control.

+    @see bgwrite

+@param data <string/bytes> - The data to chunk up

@param chunkSize <integer> - The max siZe of each chunk.
-
chunk_data(data, chunkSize)
chunk_data - Chunks a string/bytes into a list of string/bytes, each member up to #chunkSize in length.

-e.x.    chunk_data("123456789", 2) = ["12", "34", "56", "78", "9"]
-

- - - +
chunk_data(data, chunkSize)
chunk_data - Chunks a string/bytes into a list of string/bytes, each member up to #chunkSize in length.

+e.x.    chunk_data("123456789", 2) = ["12", "34", "56", "78", "9"]
+
 
-Data

+ + + - -
 
+Data
       __all__ = ('BackgroundWriteProcess', 'BackgroundIOPriority', 'bgwrite', 'bgwrite_chunk', 'chunk_data')
- +        +__all__ = ('BackgroundWriteProcess', 'BackgroundIOPriority', 'bgwrite', 'bgwrite_chunk', 'chunk_data') +

\ No newline at end of file diff --git a/doc/nonblock.common.html b/doc/nonblock.common.html index 12ff5d7..960d062 100644 --- a/doc/nonblock.common.html +++ b/doc/nonblock.common.html @@ -1,35 +1,33 @@ -Python: module nonblock.common - - +Python: module nonblock.common + + - - -
 
- 
nonblock.common
index
-

-

- - - +
 
-Functions
+ +
 
+ 
nonblock.common
index
+

+

+ + + - - +
 
+Functions
       
detect_stream_mode(stream)
detect_stream_mode - Detect the mode on a given stream

-    @param stream <object> - A stream object

-    If "mode" is present, that will be used.

+
       
detect_stream_mode(stream)
detect_stream_mode - Detect the mode on a given stream

+    @param stream <object> - A stream object

+    If "mode" is present, that will be used.

@return <type> - "Bytes" type or "str" type
-

- - - +
 
-Data

+ + + - -
 
+Data
       __all__ = ('detect_stream_mode',)
- +        +__all__ = ('detect_stream_mode',) +

\ No newline at end of file diff --git a/doc/nonblock.html b/doc/nonblock.html index f62d572..ba8daab 100644 --- a/doc/nonblock.html +++ b/doc/nonblock.html @@ -1,176 +1,173 @@ -Python: package nonblock - - +Python: package nonblock + + - - -
 
- 
nonblock (version 3.0.0)
index
-

Copyright (c) 2015-2016 Timothy Savannah under terms of LGPLv2. You should have received a copy of this LICENSE with this distribution.

+ + +
 
+ 
nonblock (version 4.0.1)
index
+

Copyright (c) 2015-2016, 2017, 2019 Timothy Savannah under terms of LGPLv2. You should have received a copy of this LICENSE with this distribution.

Contains pure-python functions for non-blocking IO in python

-

- - - +

+

 
-Package Contents
+ + - -
 
+Package Contents
       
BackgroundRead
-
BackgroundWrite
-
common
-
read
-

- - - + +
 
-Classes
       
BackgroundRead
+
BackgroundWrite
+
common
+
read
+

+ + + - - +
 
+Classes
       
-
builtins.object -
-
-
nonblock.BackgroundWrite.BackgroundIOPriority +
       
+
builtins.object +
+
+
nonblock.BackgroundWrite.BackgroundIOPriority
-

- - - +

+

 
-class BackgroundIOPriority(builtins.object)
+ + - - - - + + +
 
+class BackgroundIOPriority(builtins.object)
   BackgroundIOPriority - Priority Profile for doing background writes.

-    See __init__ for fields
 
 Methods defined here:
-
__getitem__(self, key)
+
   BackgroundIOPriority(chainPollTime, defaultChunkSize, bandwidthPct, numChunksRateSmoothing=5)

+BackgroundIOPriority - Priority Profile for doing background writes.

+    See __init__ for fields
 
 Methods defined here:
+
__getitem__(self, key)
-
__init__(self, chainPollTime, defaultChunkSize, priorityPct, charityRate=1.85, charityTime=0.0003)
__init__ - Create a BackgroundIOPriority

-Some terms: throughput - Bandwidth out (Megs per second)
-            interactivity - CPU time available for other tasks (calculations, other I/O, etc)

-@param chainPollTime - float > 0, When chaining, this is the sleep time between checking if prior is finished.
-    Too low and the polling takes up CPU time, too high and you'll lose a little time in between chained writes, while gaining interactivity elsewhere.

-@param defaultChunkSize - integer > 0, When providing a straight string/bytes to bgwrite (instead of chunking yourself, or using bgwrite_chunk) this will
-    be used as the max size of each chunk. Each chunk is written and a flush is issued (if the stream supports it).
-    Increasing this increases throughput while decreasing interactivity

-@param priorityPct - integer > 0, generally 0-100. When this number is high, throughput for the operation will be higher. When it is lower,
-   interactivity is higher, e.x. if you have a calculation going and a write going, the lower this number the longer the write will take, but the more
-   calculations will be performed during that period.

-@param charityRate - float >= 0, Every couple of blocks written, the current throughput is checked and if things have been going swiftly
-   a short sleep will be incurred. Increasing this number causes that check to happen more often.

-   This number is related to both the number of blocks and the priorityPct. The default, should be fine, but you may find it better
-   as a different value in certain cases. Increasing or decreasing could either increase or decrease interactivity, depending on those other factors.
-   Generally, however, increasing this increases interactivity and ability to write in parallel, at the cost of throughput.

-@param charityTime - float >= 0 - Used to calculate the time to sleep when the charity period hits. The equation is:
-    sleepTime = charityTime * ((dataWritten / delta) / ( (dataWritten / delta) * priorityPctDec))
-     Where dataWritten = number of bytes written already, delta = total time spent writing (not including charity time sleeping)
-     and priorityPctDec = priorityPct / 100.

-     Increasing this can increase interactivity and allow more parallel operations at the cost of throughput.
-     The default should be fine for the majority of cases, but it is tunable.

+
__init__(self, chainPollTime, defaultChunkSize, bandwidthPct, numChunksRateSmoothing=5)
__init__ - Create a BackgroundIOPriority.

+Some terms: throughput - Bandwidth out (Megs per second)
+            interactivity - CPU time available for other tasks (calculations, other I/O, etc)

+@param chainPollTime - float > 0, When chaining, this is the sleep time between checking if prior is finished.
+    Too low and the polling takes up CPU time, too high and you'll lose a little time in between chained writes, while gaining interactivity elsewhere.

+@param defaultChunkSize - integer > 0, When providing a straight string/bytes to bgwrite (instead of chunking yourself, or using bgwrite_chunk) this will
+    be used as the max size of each chunk. Each chunk is written and a flush is issued (if the stream supports it).
+    Increasing this increases throughput while decreasing interactivity

+@param bandwidthPct - integer > 0 and < 100. This is the percentage of overall bandwidth that this task will attempt to use.

+  A high number means higher throughput at the cost of lest interactivity for other tasks, a low number means the opposite.

+  So, for example, a bandwidthPct of "50" will attempt to use "50%" of the available bandwidth. Note, this does not represent theroetical
+  max bandwidth, i.e. the max rate of the I/O device, but the amount of available bandwidth available to this application. For example,
+  if this is given "100%", no throttling is performed. If this is given "80%", then it calculates the average time to write a single chunk,
+  ( see #numChunksRateSmoothing for how many chunks are used in evaluating this average ), and sleeps for then 20% of that time at the end
+  of every chunk.

+@param numChunksRateSmoothing - integer >= 1 , Default 5. This is the number of chunks which are used in calculating the current throughput rate.
+  See #bandwidthPct for the other half of the story. The higher this number, the more "fair" your application will be against a constant
+  rate of I/O by other applications, but the less able it may be to play fair when the external I/O is spiking.

+  Also, consider that this is related to the #defaultChunkSize, as it is not a constant period of time. The default of "5" should be okay,
+  but you may want to tune it if you use really large or really small chunk sizes.


An "interactivity score" is defined to be (number of calculations) / (time to write data).
-
__setitem__(self, key, value)
+
__setitem__(self, key, value)
-
-Data descriptors defined here:
-
chainPollTime
+
+Data descriptors defined here:
+
bandwidthPct
-
charityRate
+
chainPollTime
-
charityTime
+
defaultChunkSize
-
defaultChunkSize
+
numChunksRateSmoothing
-
priorityPct
-
-

- - - +
 
-Functions

+ + + - - +
 
+Functions
       
bgread(stream, blockSizeLimit=65535, pollTime=0.03, closeStream=True)
bgread - Start a thread which will read from the given stream in a non-blocking fashion, and automatically populate data in the returned object.

-    @param stream <object> - A stream on which to read. Socket, file, etc.

-    @param blockSizeLimit <None/int> - Number of bytes. Default 65535.

-        If None, the stream will be read from until there is no more available data (not closed, but you've read all that's been flushed to straem). This is okay for smaller datasets, but this number effectively controls the amount of CPU time spent in I/O on this stream VS everything else in your application. The default of 65535 bytes is a fair amount of data.

-    @param pollTime <float> - Default .03 (30ms) After all available data has been read from the stream, wait this many seconds before checking again for more data.
-        
-        A low number here means a high priority, i.e. more cycles will be devoted to checking and collecting the background data. Since this is a non-blocking read, this value is the "block", which will return execution context to the remainder of the application. The default of 100ms should be fine in most cases. If it's really idle data collection, you may want to try a value of 1 second.

-    @param closeStream <bool> - Default True. If True, the "close" method on the stream object will be called when the other side has closed and all data has been read.



-NOTES --

-        blockSizeLimit / pollTime is your effective max-throughput. Real throughput will be lower than this number, as the actual throughput is be defined by:

-        T = (blockSizeLimit / pollTime) - DeviceReadTime(blockSizeLimit)

-    Using the defaults of .03 and 65535 means you'll read up to 2 MB per second. Keep in mind that the more time spent in I/O means less time spent doing other tasks.


-    @return - The return of this function is a BackgroundReadData object. This object contains an attribute "blocks" which is a list of the non-zero-length blocks that were read from the stream. The object also contains a calculated property, "data", which is a string/bytes (depending on stream mode) of all the data currently read. The property "isFinished" will be set to True when the stream has been closed. The property "error" will be set to any exception that occurs during reading which will terminate the thread. @see BackgroundReadData for more info.
-
bgwrite(fileObj, data, closeWhenFinished=False, chainAfter=None, ioPrio=4)
bgwrite - Start a background writing process

-    @param fileObj <stream> - A stream backed by an fd

-    @param data    <str/bytes/list> - The data to write. If a list is given, each successive element will be written to the fileObj and flushed. If a string/bytes is provided, it will be chunked according to the #BackgroundIOPriority chosen. If you would like a different chunking than the chosen ioPrio provides, use #bgwrite_chunk function instead.

-       Chunking makes the data available quicker on the other side, reduces iowait on this side, and thus increases interactivity (at penalty of throughput).

-    @param closeWhenFinished <bool> - If True, the given fileObj will be closed after all the data has been written. Default False.

-    @param chainAfter  <None/BackgroundWriteProcess> - If a BackgroundWriteProcess object is provided (the return of bgwrite* functions), this data will be held for writing until the data associated with the provided object has completed writing.
-    Use this to queue several background writes, but retain order within the resulting stream.


-    @return - BackgroundWriteProcess - An object representing the state of this operation. @see BackgroundWriteProcess
-
bgwrite_chunk(fileObj, data, chunkSize, closeWhenFinished=False, chainAfter=None, ioPrio=4)
bgwrite_chunk - Chunk up the data into even #chunkSize blocks, and then pass it onto #bgwrite.
-    Use this to break up a block of data into smaller segments that can be written and flushed.
-    The smaller the chunks, the more interactive (recipient gets data quicker, iowait goes down for you) at cost of throughput.

-    bgwrite will automatically chunk according to the given ioPrio, but you can use this for finer-tuned control.

-    @see bgwrite

-@param data <string/bytes> - The data to chunk up

+
       
bgread(stream, blockSizeLimit=65535, pollTime=0.03, closeStream=True)
bgread - Start a thread which will read from the given stream in a non-blocking fashion, and automatically populate data in the returned object.

+    @param stream <object> - A stream on which to read. Socket, file, etc.

+    @param blockSizeLimit <None/int> - Number of bytes. Default 65535.

+        If None, the stream will be read from until there is no more available data (not closed, but you've read all that's been flushed to straem). This is okay for smaller datasets, but this number effectively controls the amount of CPU time spent in I/O on this stream VS everything else in your application. The default of 65535 bytes is a fair amount of data.

+    @param pollTime <float> - Default .03 (30ms) After all available data has been read from the stream, wait this many seconds before checking again for more data.

+        A low number here means a high priority, i.e. more cycles will be devoted to checking and collecting the background data. Since this is a non-blocking read, this value is the "block", which will return execution context to the remainder of the application. The default of 100ms should be fine in most cases. If it's really idle data collection, you may want to try a value of 1 second.

+    @param closeStream <bool> - Default True. If True, the "close" method on the stream object will be called when the other side has closed and all data has been read.



+NOTES --

+        blockSizeLimit / pollTime is your effective max-throughput. Real throughput will be lower than this number, as the actual throughput is be defined by:

+        T = (blockSizeLimit / pollTime) - DeviceReadTime(blockSizeLimit)

+    Using the defaults of .03 and 65535 means you'll read up to 2 MB per second. Keep in mind that the more time spent in I/O means less time spent doing other tasks.


+    @return - The return of this function is a BackgroundReadData object. This object contains an attribute "blocks" which is a list of the non-zero-length blocks that were read from the stream. The object also contains a calculated property, "data", which is a string/bytes (depending on stream mode) of all the data currently read. The property "isFinished" will be set to True when the stream has been closed. The property "error" will be set to any exception that occurs during reading which will terminate the thread. @see BackgroundReadData for more info.
+
bgwrite(fileObj, data, closeWhenFinished=False, chainAfter=None, ioPrio=4)
bgwrite - Start a background writing process

+    @param fileObj <stream> - A stream backed by an fd

+    @param data    <str/bytes/list> - The data to write. If a list is given, each successive element will be written to the fileObj and flushed. If a string/bytes is provided, it will be chunked according to the #BackgroundIOPriority chosen. If you would like a different chunking than the chosen ioPrio provides, use #bgwrite_chunk function instead.

+       Chunking makes the data available quicker on the other side, reduces iowait on this side, and thus increases interactivity (at penalty of throughput).

+    @param closeWhenFinished <bool> - If True, the given fileObj will be closed after all the data has been written. Default False.

+    @param chainAfter  <None/BackgroundWriteProcess> - If a BackgroundWriteProcess object is provided (the return of bgwrite* functions), this data will be held for writing until the data associated with the provided object has completed writing.
+    Use this to queue several background writes, but retain order within the resulting stream.


+    @return - BackgroundWriteProcess - An object representing the state of this operation. @see BackgroundWriteProcess
+
bgwrite_chunk(fileObj, data, chunkSize, closeWhenFinished=False, chainAfter=None, ioPrio=4)
bgwrite_chunk - Chunk up the data into even #chunkSize blocks, and then pass it onto #bgwrite.
+    Use this to break up a block of data into smaller segments that can be written and flushed.
+    The smaller the chunks, the more interactive (recipient gets data quicker, iowait goes down for you) at cost of throughput.

+    bgwrite will automatically chunk according to the given ioPrio, but you can use this for finer-tuned control.

+    @see bgwrite

+@param data <string/bytes> - The data to chunk up

@param chunkSize <integer> - The max siZe of each chunk.
-
nonblock_read(stream, limit=None, forceMode=None)
nonblock_read - Read any data available on the given stream (file, socket, etc) without blocking and regardless of newlines.

-    @param stream <object> - A stream (like a file object or a socket)
-    @param limit <None/int> - Max number of bytes to read. If None or 0, will read as much data is available.
-    @param forceMode <None/mode string> - Default None. Will be autodetected if None. If you want to explicitly force a mode, provide 'b' for binary (bytes) or 't' for text (Str). This determines the return type.

+
nonblock_read(stream, limit=None, forceMode=None)
nonblock_read - Read any data available on the given stream (file, socket, etc) without blocking and regardless of newlines.

+    @param stream <object> - A stream (like a file object or a socket)
+    @param limit <None/int> - Max number of bytes to read. If None or 0, will read as much data is available.
+    @param forceMode <None/mode string> - Default None. Will be autodetected if None. If you want to explicitly force a mode, provide 'b' for binary (bytes) or 't' for text (Str). This determines the return type.

    @return <str or bytes depending on stream's mode> - Any data available on the stream, or "None" if the stream was closed on the other side and all data has already been read.
-

- - - +
 
-Data

+ + + - -
 
+Data
       __all__ = ('nonblock_read', 'bgwrite', 'bgwrite_chunk', 'BackgroundIOPriority', 'bgread')
- +        +__all__ = ('nonblock_read', 'bgwrite', 'bgwrite_chunk', 'BackgroundIOPriority', 'bgread') +

\ No newline at end of file diff --git a/doc/nonblock.read.html b/doc/nonblock.read.html index d4ba16a..2ded8fc 100644 --- a/doc/nonblock.read.html +++ b/doc/nonblock.read.html @@ -1,45 +1,43 @@ -Python: module nonblock.read - - +Python: module nonblock.read + + - - -
 
- 
nonblock.read
index
-

Copyright (c) 2015-2016 Timothy Savannah under terms of LGPLv2. You should have received a copy of this LICENSE with this distribution.

+ + +
 
+ 
nonblock.read
index
+

Copyright (c) 2015-2016 Timothy Savannah under terms of LGPLv2. You should have received a copy of this LICENSE with this distribution.

read.py Contains pure-python functions for non-blocking reads in python

-

- - - +

+

 
-Modules
+ + - -
 
+Modules
       
select
-

- - - + +
 
-Functions
       
select
+

+ + + - - +
 
+Functions
       
nonblock_read(stream, limit=None, forceMode=None)
nonblock_read - Read any data available on the given stream (file, socket, etc) without blocking and regardless of newlines.

-    @param stream <object> - A stream (like a file object or a socket)
-    @param limit <None/int> - Max number of bytes to read. If None or 0, will read as much data is available.
-    @param forceMode <None/mode string> - Default None. Will be autodetected if None. If you want to explicitly force a mode, provide 'b' for binary (bytes) or 't' for text (Str). This determines the return type.

+
       
nonblock_read(stream, limit=None, forceMode=None)
nonblock_read - Read any data available on the given stream (file, socket, etc) without blocking and regardless of newlines.

+    @param stream <object> - A stream (like a file object or a socket)
+    @param limit <None/int> - Max number of bytes to read. If None or 0, will read as much data is available.
+    @param forceMode <None/mode string> - Default None. Will be autodetected if None. If you want to explicitly force a mode, provide 'b' for binary (bytes) or 't' for text (Str). This determines the return type.

    @return <str or bytes depending on stream's mode> - Any data available on the stream, or "None" if the stream was closed on the other side and all data has already been read.
-

- - - +
 
-Data

+ + + - -
 
+Data
       __all__ = ('nonblock_read',)
- +        +__all__ = ('nonblock_read',) +

\ No newline at end of file diff --git a/experimental/remove_default_chunk_size.patch b/experimental/remove_default_chunk_size.patch new file mode 100644 index 0000000..0957495 --- /dev/null +++ b/experimental/remove_default_chunk_size.patch @@ -0,0 +1,141 @@ +This will remove the default chunk size for each priority level. + +Do not yet use, this is a work-in-progress, and if you do not specify explicit chunk sizes, you will suffer a significant drop in performance. + +diff --git a/nonblock/BackgroundWrite.py b/nonblock/BackgroundWrite.py +index 3e812b4..7fd3e8e 100644 +--- a/nonblock/BackgroundWrite.py ++++ b/nonblock/BackgroundWrite.py +@@ -27,8 +27,9 @@ __all__ = ('BackgroundWriteProcess', 'BackgroundIOPriority', 'bgwrite', 'bgwrite + #if DEBUG: + # import sys + ++_SIZE_MEG = 1024 * 1024 + +-def bgwrite(fileObj, data, closeWhenFinished=False, chainAfter=None, ioPrio=4): ++def bgwrite(fileObj, data, closeWhenFinished=False, chainAfter=None, ioPrio=4, chunkSize=_SIZE_MEG * .5): + ''' + bgwrite - Start a background writing process + +@@ -47,7 +48,7 @@ def bgwrite(fileObj, data, closeWhenFinished=False, chainAfter=None, ioPrio=4): + @return - BackgroundWriteProcess - An object representing the state of this operation. @see BackgroundWriteProcess + ''' + +- thread = BackgroundWriteProcess(fileObj, data, closeWhenFinished, chainAfter, ioPrio) ++ thread = BackgroundWriteProcess(fileObj, data, closeWhenFinished, chainAfter, ioPrio, chunkSize) + thread.start() + + return thread +@@ -66,9 +67,9 @@ def bgwrite_chunk(fileObj, data, chunkSize, closeWhenFinished=False, chainAfter= + + @param chunkSize - The max siZe of each chunk. + ''' +- chunks = chunk_data(data, chunkSize) ++# chunks = chunk_data(data, chunkSize) + +- return bgwrite(fileObj, chunks, closeWhenFinished, chainAfter, ioPrio) ++ return bgwrite(fileObj, data, closeWhenFinished, chainAfter, ioPrio, chunkSize) + + + class BackgroundIOPriority(object): +@@ -78,9 +79,9 @@ class BackgroundIOPriority(object): + See __init__ for fields + ''' + +- __slots__ = ('chainPollTime', 'defaultChunkSize', 'bandwidthPct', 'numChunksRateSmoothing') ++ __slots__ = ('chainPollTime', 'bandwidthPct', 'numChunksRateSmoothing') + +- def __init__(self, chainPollTime, defaultChunkSize, bandwidthPct, numChunksRateSmoothing=5): ++ def __init__(self, chainPollTime, bandwidthPct, numChunksRateSmoothing=5): + ''' + __init__ - Create a BackgroundIOPriority. + +@@ -90,12 +91,6 @@ class BackgroundIOPriority(object): + @param chainPollTime - float > 0, When chaining, this is the sleep time between checking if prior is finished. + Too low and the polling takes up CPU time, too high and you'll lose a little time in between chained writes, while gaining interactivity elsewhere. + +- @param defaultChunkSize - integer > 0, When providing a straight string/bytes to bgwrite (instead of chunking yourself, or using bgwrite_chunk) this will +- be used as the max size of each chunk. Each chunk is written and a flush is issued (if the stream supports it). +- Increasing this increases throughput while decreasing interactivity +- +- @param bandwidthPct - integer > 0 and < 100. This is the percentage of overall bandwidth that this task will attempt to use. +- + A high number means higher throughput at the cost of lest interactivity for other tasks, a low number means the opposite. + + So, for example, a bandwidthPct of "50" will attempt to use "50%" of the available bandwidth. Note, this does not represent theroetical +@@ -108,7 +103,7 @@ class BackgroundIOPriority(object): + See #bandwidthPct for the other half of the story. The higher this number, the more "fair" your application will be against a constant + rate of I/O by other applications, but the less able it may be to play fair when the external I/O is spiking. + +- Also, consider that this is related to the #defaultChunkSize, as it is not a constant period of time. The default of "5" should be okay, ++ Also, consider that this is related to the #chunkSize, as it is not a constant period of time. The default of "5" should be okay, + but you may want to tune it if you use really large or really small chunk sizes. + + +@@ -117,7 +112,6 @@ class BackgroundIOPriority(object): + + + self.chainPollTime = chainPollTime +- self.defaultChunkSize = defaultChunkSize + self.bandwidthPct = float(bandwidthPct) + if bandwidthPct <= 0 or bandwidthPct > 100: + raise ValueError('Given bandwidthPct %f must be > 0 and <= 100') +@@ -139,16 +133,16 @@ _SIZE_MEG = 1024 * 1024 + + # BG_IO_PRIOS - Predefined I/O priorities, 1-10. The lower the number, the more throughput at the cost of interactivity + BG_IO_PRIOS = { +- 1 : BackgroundIOPriority(.0009, _SIZE_MEG * 5, 100), # Maximum throughput, no regard for interactivity. +- 2 : BackgroundIOPriority(.0009, _SIZE_MEG * 4, 90), +- 3 : BackgroundIOPriority(.0015, _SIZE_MEG * 3, 78), +- 4 : BackgroundIOPriority(.0015, _SIZE_MEG * 2, 72), +- 5 : BackgroundIOPriority(.0019, _SIZE_MEG * 1.6, 65), +- 6 : BackgroundIOPriority(.0019, _SIZE_MEG * .75, 55), +- 7 : BackgroundIOPriority(.0024, _SIZE_MEG * .69, 45), +- 8 : BackgroundIOPriority(.0024, _SIZE_MEG * .5, 35), +- 9 : BackgroundIOPriority(.0031, _SIZE_MEG * .3, 30), +- 10 : BackgroundIOPriority(.0100, _SIZE_MEG * .25, 20), # Least throughput, most interactivity, very little throughput ++ 1 : BackgroundIOPriority(.0009, 100), # Maximum throughput, no regard for interactivity. ++ 2 : BackgroundIOPriority(.0009, 90), ++ 3 : BackgroundIOPriority(.0015, 78), ++ 4 : BackgroundIOPriority(.0015, 72), ++ 5 : BackgroundIOPriority(.0019, 65), ++ 6 : BackgroundIOPriority(.0019, 55), ++ 7 : BackgroundIOPriority(.0024, 45), ++ 8 : BackgroundIOPriority(.0024, 35), ++ 9 : BackgroundIOPriority(.0031, 30), ++ 10 : BackgroundIOPriority(.0100, 20), # Least throughput, most interactivity, very little throughput + } + + +@@ -166,7 +160,7 @@ class BackgroundWriteProcess(threading.Thread): + ''' + # Design question: What about errors? + +- def __init__(self, fileObj, dataBlocks, closeWhenFinished=False, chainAfter=None, ioPrio=4): ++ def __init__(self, fileObj, dataBlocks, closeWhenFinished=False, chainAfter=None, ioPrio=4, chunkSize=_SIZE_MEG * .5): + ''' + __init__ - Create the BackgroundWriteProcess thread. You should probably use bgwrite or bgwrite_chunk instead of calling this directly. + +@@ -180,6 +174,12 @@ class BackgroundWriteProcess(threading.Thread): + + @param ioPrio - If an integer (1-10), a predefined BackgroundIOPriority will be used. 1 is highest throughput, 10 is most interactivity. You can also pass in your own BackgroundIOPriority object if you want to define a custom profile. + ++ @param chunkSize - integer > 0, When providing a straight string/bytes to bgwrite (instead of chunking yourself, or using bgwrite_chunk) this will ++ be used as the max size of each chunk. Each chunk is written and a flush is issued (if the stream supports it). ++ Increasing this increases throughput while decreasing interactivity ++ ++ @param bandwidthPct - integer > 0 and < 100. This is the percentage of overall bandwidth that this task will attempt to use. ++ + + @raises ValueError - If ioPrio is neither a BackgroundIOPriority nor integer 1-10 inclusive + - If chainAfter is not a BackgroundWriteProcess or None +@@ -196,7 +196,8 @@ class BackgroundWriteProcess(threading.Thread): + raise ValueError('Invalid ioPrio: %s. Available priority levels are: %s' %(str(ioPrio), str(list(BG_IO_PRIOS.keys()))) ) + + if type(dataBlocks) not in (list, tuple): +- dataBlocks = chunk_data(dataBlocks, self.backgroundIOPriority.defaultChunkSize) ++ dataBlocks = chunk_data(dataBlocks, chunkSize) ++ + self.remainingData = deque(dataBlocks) + + self.closeWhenFinished = closeWhenFinished diff --git a/mkdoc.sh b/mkdoc.sh new file mode 100755 index 0000000..344ecf2 --- /dev/null +++ b/mkdoc.sh @@ -0,0 +1,56 @@ +#!/bin/bash + +shopt -s nullglob +ALL_MODS="$(echo nonblock/**.py | tr ' ' '\n' | sed -e 's|/|.|g' -e 's|.py$||g' -e 's|.__init__$||g' | tr '\n' ' ')" + +pydoc -w ${ALL_MODS} +mv nonblock*.html doc/ +pushd doc >/dev/null 2>&1 +rm -f index.html + +for fname in `echo *.html`; +do + python < emIndex: + parentNode.removeChild( parentNode.children[i] ) + i -= 1 + + + with open(filename, 'wt') as f: + f.write(parser.getHTML()) + + +EOT + + +done + +ln -s nonblock.html index.html + +popd >/dev/null 2>&1 diff --git a/nonblock/BackgroundRead.py b/nonblock/BackgroundRead.py index f8da164..ce76b80 100644 --- a/nonblock/BackgroundRead.py +++ b/nonblock/BackgroundRead.py @@ -3,7 +3,7 @@ read.py Contains pure-python functions for non-blocking reads in python - + ''' # vim: ts=4 sw=4 expandtab @@ -27,7 +27,7 @@ def bgread(stream, blockSizeLimit=65535, pollTime=.03, closeStream=True): If None, the stream will be read from until there is no more available data (not closed, but you've read all that's been flushed to straem). This is okay for smaller datasets, but this number effectively controls the amount of CPU time spent in I/O on this stream VS everything else in your application. The default of 65535 bytes is a fair amount of data. @param pollTime - Default .03 (30ms) After all available data has been read from the stream, wait this many seconds before checking again for more data. - + A low number here means a high priority, i.e. more cycles will be devoted to checking and collecting the background data. Since this is a non-blocking read, this value is the "block", which will return execution context to the remainder of the application. The default of 100ms should be fine in most cases. If it's really idle data collection, you may want to try a value of 1 second. @param closeStream - Default True. If True, the "close" method on the stream object will be called when the other side has closed and all data has been read. @@ -84,7 +84,7 @@ class BackgroundReadData(object): data - A calculated property, which is a bytes/str (depending on stream mode). It is the joining of all the read blocks, and contains all the data read to-date. isFinished - starts False, and becomes True after all data has been read from the stream. Will remain False if there is an exception raised during I/O - + error - starts None, and is set to any exception that is raised during reading (which will also terminate the thread) ''' @@ -109,7 +109,7 @@ def data(self): ''' return self.emptyStr.join(self.blocks) - + diff --git a/nonblock/BackgroundWrite.py b/nonblock/BackgroundWrite.py index 691055b..3e812b4 100644 --- a/nonblock/BackgroundWrite.py +++ b/nonblock/BackgroundWrite.py @@ -1,5 +1,5 @@ ''' - Copyright (c) 2015 Timothy Savannah under terms of LGPLv2. You should have received a copy of this LICENSE with this distribution. + Copyright (c) 2015, 2016, 2017 Timothy Savannah under terms of LGPLv2. You should have received a copy of this LICENSE with this distribution. BackgroundWrite.py Contains pure-python functions for non-blocking background writing (writing multiple streams at once; interactive writing allowing a high amount of CPU time for calculations/other tasks @@ -7,14 +7,27 @@ ''' # vim: ts=4 sw=4 expandtab -#import sys import threading import time from collections import deque +# TODO: I'd like to maybe remove defaultChunkSize from BACKGROUND_IO_PRIO and instead keep it strictly priority, +# and forcing chunk size to be specified every time (basically, making "bgwrite_chunk" the prototype ). +# +# See "remove_default_chunk_size.patch" in the default directory. The issue I have with this is there +# seems to be a noticable drop in performance with this applied, and the priority levels have +# much less meaning. + + __all__ = ('BackgroundWriteProcess', 'BackgroundIOPriority', 'bgwrite', 'bgwrite_chunk', 'chunk_data') +# Uncomment the "DEBUG" sections you want to see below. Search for DEBUG. +#DEBUG = False +#if DEBUG: +# import sys + + def bgwrite(fileObj, data, closeWhenFinished=False, chainAfter=None, ioPrio=4): ''' bgwrite - Start a background writing process @@ -33,7 +46,7 @@ def bgwrite(fileObj, data, closeWhenFinished=False, chainAfter=None, ioPrio=4): @return - BackgroundWriteProcess - An object representing the state of this operation. @see BackgroundWriteProcess ''' - + thread = BackgroundWriteProcess(fileObj, data, closeWhenFinished, chainAfter, ioPrio) thread.start() @@ -65,11 +78,11 @@ class BackgroundIOPriority(object): See __init__ for fields ''' - __slots__ = ('chainPollTime', 'defaultChunkSize', 'priorityPct', 'charityRate', 'charityTime') + __slots__ = ('chainPollTime', 'defaultChunkSize', 'bandwidthPct', 'numChunksRateSmoothing') - def __init__(self, chainPollTime, defaultChunkSize, priorityPct, charityRate=1.85, charityTime=.0003): + def __init__(self, chainPollTime, defaultChunkSize, bandwidthPct, numChunksRateSmoothing=5): ''' - __init__ - Create a BackgroundIOPriority. + __init__ - Create a BackgroundIOPriority. Some terms: throughput - Bandwidth out (Megs per second) interactivity - CPU time available for other tasks (calculations, other I/O, etc) @@ -81,24 +94,23 @@ def __init__(self, chainPollTime, defaultChunkSize, priorityPct, charityRate=1.8 be used as the max size of each chunk. Each chunk is written and a flush is issued (if the stream supports it). Increasing this increases throughput while decreasing interactivity - @param priorityPct - integer > 0, generally 0-100. When this number is high, throughput for the operation will be higher. When it is lower, - interactivity is higher, e.x. if you have a calculation going and a write going, the lower this number the longer the write will take, but the more - calculations will be performed during that period. + @param bandwidthPct - integer > 0 and < 100. This is the percentage of overall bandwidth that this task will attempt to use. + + A high number means higher throughput at the cost of lest interactivity for other tasks, a low number means the opposite. - @param charityRate - float >= 0, Every couple of blocks written, the current throughput is checked and if things have been going swiftly - a short sleep will be incurred. Increasing this number causes that check to happen more often. + So, for example, a bandwidthPct of "50" will attempt to use "50%" of the available bandwidth. Note, this does not represent theroetical + max bandwidth, i.e. the max rate of the I/O device, but the amount of available bandwidth available to this application. For example, + if this is given "100%", no throttling is performed. If this is given "80%", then it calculates the average time to write a single chunk, + ( see #numChunksRateSmoothing for how many chunks are used in evaluating this average ), and sleeps for then 20% of that time at the end + of every chunk. - This number is related to both the number of blocks and the priorityPct. The default, should be fine, but you may find it better - as a different value in certain cases. Increasing or decreasing could either increase or decrease interactivity, depending on those other factors. - Generally, however, increasing this increases interactivity and ability to write in parallel, at the cost of throughput. + @param numChunksRateSmoothing - integer >= 1 , Default 5. This is the number of chunks which are used in calculating the current throughput rate. + See #bandwidthPct for the other half of the story. The higher this number, the more "fair" your application will be against a constant + rate of I/O by other applications, but the less able it may be to play fair when the external I/O is spiking. - @param charityTime - float >= 0 - Used to calculate the time to sleep when the charity period hits. The equation is: - sleepTime = charityTime * ((dataWritten / delta) / ( (dataWritten / delta) * priorityPctDec)) - Where dataWritten = number of bytes written already, delta = total time spent writing (not including charity time sleeping) - and priorityPctDec = priorityPct / 100. + Also, consider that this is related to the #defaultChunkSize, as it is not a constant period of time. The default of "5" should be okay, + but you may want to tune it if you use really large or really small chunk sizes. - Increasing this can increase interactivity and allow more parallel operations at the cost of throughput. - The default should be fine for the majority of cases, but it is tunable. An "interactivity score" is defined to be (number of calculations) / (time to write data). ''' @@ -106,9 +118,11 @@ def __init__(self, chainPollTime, defaultChunkSize, priorityPct, charityRate=1.8 self.chainPollTime = chainPollTime self.defaultChunkSize = defaultChunkSize - self.priorityPct = float(priorityPct) - self.charityRate = float(charityRate) - self.charityTime = float(charityTime) + self.bandwidthPct = float(bandwidthPct) + if bandwidthPct <= 0 or bandwidthPct > 100: + raise ValueError('Given bandwidthPct %f must be > 0 and <= 100') + + self.numChunksRateSmoothing = numChunksRateSmoothing def __getitem__(self, key): if key in BackgroundIOPriority.__slots__: @@ -119,22 +133,22 @@ def __setitem__(self, key, value): if key in BackgroundIOPriority.__slots__: return setattr(self, key, value) raise KeyError('Unknown key: %s\n' %(key,)) - + _SIZE_MEG = 1024 * 1024 # BG_IO_PRIOS - Predefined I/O priorities, 1-10. The lower the number, the more throughput at the cost of interactivity BG_IO_PRIOS = { - 1 : BackgroundIOPriority(.0009, _SIZE_MEG * 5, 100), # Most throughput, least interactivity, but still very interactive. + 1 : BackgroundIOPriority(.0009, _SIZE_MEG * 5, 100), # Maximum throughput, no regard for interactivity. 2 : BackgroundIOPriority(.0009, _SIZE_MEG * 4, 90), - 3 : BackgroundIOPriority(.0015, _SIZE_MEG * 3, 80), - 4 : BackgroundIOPriority(.0015, _SIZE_MEG * 2, 70), - 5 : BackgroundIOPriority(.0019, _SIZE_MEG * 1.25, 65), - 6 : BackgroundIOPriority(.0019, _SIZE_MEG * .6, 50), - 7 : BackgroundIOPriority(.0024, _SIZE_MEG * .4, 40), - 8 : BackgroundIOPriority(.0024, _SIZE_MEG * .25, 30), - 9 : BackgroundIOPriority(.0031, _SIZE_MEG * .175, 30), - 10 : BackgroundIOPriority(.0100, _SIZE_MEG * .1, 25), # Least throughput, most interactivity, very little throughput + 3 : BackgroundIOPriority(.0015, _SIZE_MEG * 3, 78), + 4 : BackgroundIOPriority(.0015, _SIZE_MEG * 2, 72), + 5 : BackgroundIOPriority(.0019, _SIZE_MEG * 1.6, 65), + 6 : BackgroundIOPriority(.0019, _SIZE_MEG * .75, 55), + 7 : BackgroundIOPriority(.0024, _SIZE_MEG * .69, 45), + 8 : BackgroundIOPriority(.0024, _SIZE_MEG * .5, 35), + 9 : BackgroundIOPriority(.0031, _SIZE_MEG * .3, 30), + 10 : BackgroundIOPriority(.0100, _SIZE_MEG * .25, 20), # Least throughput, most interactivity, very little throughput } @@ -212,14 +226,12 @@ def run(self): # Pull class data into locals fileObj = self.fileObj - priorityPct = self.backgroundIOPriority.priorityPct - priorityPctDec = priorityPct / 100.0 + bandwidthPct = self.backgroundIOPriority.bandwidthPct + bandwidthPctDec = bandwidthPct / 100.0 - charityRate = self.backgroundIOPriority.charityRate - charityTime = self.backgroundIOPriority.charityTime - # Number of blocks, total - numBlocks = len(self.remainingData) + # Number of blocks, total (note: unused, removed) + #numBlocks = len(self.remainingData) # Bytes written dataWritten = 0 @@ -234,43 +246,93 @@ def run(self): doFlush = lambda obj : 1 - # charityPeriod - How often we stop for a short bit to be gracious to other running tasks - charityPeriod = int(max( (priorityPctDec * numBlocks) / charityRate, 3 )) + # numChunksRateSmoothing - How often we stop for a short bit to be gracious to other running tasks. + # float for division below + numChunksRateSmoothing = float(self.backgroundIOPriority.numChunksRateSmoothing) - # i will be the counter from 1 to charityPeriod, and then reset + # i will be the counter from 1 to numChunksRateSmoothing, and then reset i = 1 + # We start with using max bandwidth until we hit #numChunksRateSmoothing , at which case we recalculate + # sleepTime. We sleep after every block written to maintain a desired average throughput based on + # bandwidthPct + sleepTime = 0 + # Before represents the "start" time. When we sleep, we will increment this value # such that [ delta = (after - before) ] only accounts for time we've spent writing, # not in charity. before = time.time() + + # timeSlept - Amount of time slept, which must be subtracted from total time spend + # to get an accurate picture of throughput. + timeSlept = 0 + + # firstPass - Mark the first pass through, so we can get a rough calculation + # of speed from the first write, and recalculate after #numChunksRateSmoothing + firstPass = True + + if bandwidthPct == 100: + shouldRecalculate = lambda i, numChunksRateSmoothing, firstPass : False + else: + shouldRecalculate = lambda i, numChunksRateSmoothing, firstPass : firstPass or i == numChunksRateSmoothing + + while len(self.remainingData) > 0: # pop, write, flush nextData = self.remainingData.popleft() fileObj.write(nextData) doFlush(fileObj) - + dataWritten += len(nextData) - - if i == charityPeriod: + if sleepTime: + sleepBefore = time.time() + + time.sleep(sleepTime) + + sleepAfter = time.time() + timeSlept += (sleepAfter - sleepBefore) + + if shouldRecalculate(i, numChunksRateSmoothing, firstPass) is True: + # if not sleeptime, we are on first # We've completed a full period, time for charity after = time.time() - delta = after - before - # Uncomment the following to see rates + delta = after - before - timeSlept -# sys.stdout.write('\t I have written %d bytes in %3.3f seconds (%4.5f M/s)\n' %(dataWritten, delta, (dataWritten / delta) / (1024*1024) )) -# sys.stdout.flush() - # Calculate how much time we should give up to others - sleepTime = charityTime * ((dataWritten / delta) / ( (dataWritten / delta) * priorityPctDec)) -# sys.stdout.write('DOING sleepTime is: %f\n' %(sleepTime,)) - time.sleep(sleepTime) +# if DEBUG is True: +# rate = dataWritten / delta + +# sys.stdout.write('\t I have written %d bytes in %3.3f seconds and slept %3.3f sec (%4.5f M/s over %3.3fs)\n' %(dataWritten, delta, timeSlept, (rate) / (1024*1024), delta + timeSlept )) +# sys.stdout.flush() + + # Calculate how much time we should give up on each block to other tasks + sleepTime = delta * (1.00 - bandwidthPctDec) + sleepTime /= numChunksRateSmoothing - # Adjust so that [ delta = (after - before) ] only reflects time spent writing, and reset counter - before += (time.time() - after) +# if DEBUG is True: +# sys.stdout.write('Calculated new sleepTime to be: %f\n' %(sleepTime,)) + + timeSlept = 0 + before = time.time() i = 0 +# elif DEBUG is True and i == numChunksRateSmoothing: +# # When bandwidth pct is 100 (prio=1), the above DEBUG will never be hit. +# after = time.time() +# +# delta = after - before - timeSlept +# +# rate = dataWritten / delta +# +# sys.stdout.write('\t I have written %d bytes in %3.3f seconds and slept %3.3f sec (%4.5f M/s over %3.3fs)\n' %(dataWritten, delta, timeSlept, (rate) / (1024*1024), delta + timeSlept )) +# sys.stdout.flush() +# +# timeSlept = 0 +# before = time.time() +# i = 0 + + firstPass = False i += 1 diff --git a/nonblock/__init__.py b/nonblock/__init__.py index f86ec12..8734cc3 100644 --- a/nonblock/__init__.py +++ b/nonblock/__init__.py @@ -1,5 +1,5 @@ ''' - Copyright (c) 2015-2016 Timothy Savannah under terms of LGPLv2. You should have received a copy of this LICENSE with this distribution. + Copyright (c) 2015-2016, 2017, 2019 Timothy Savannah under terms of LGPLv2. You should have received a copy of this LICENSE with this distribution. Contains pure-python functions for non-blocking IO in python ''' @@ -14,6 +14,6 @@ __all__ = ('nonblock_read', 'bgwrite', 'bgwrite_chunk', 'BackgroundIOPriority', 'bgread') -__version__ = '3.0.1' -__version_tuple = (3, 0, 1) +__version__ = '4.0.1' +__version_tuple = (4, 0, 1) diff --git a/nonblock/common.py b/nonblock/common.py index 23c9694..d8cab38 100644 --- a/nonblock/common.py +++ b/nonblock/common.py @@ -18,7 +18,7 @@ def detect_stream_mode(stream): elif 't' in stream.mode: return str - # Read a zero-length string off the device + # Read a zero-length string off the device if hasattr(stream, 'read'): zeroStr = stream.read(0) if type(zeroStr) is str: diff --git a/nonblock/read.py b/nonblock/read.py index f6b79f9..b2ea751 100644 --- a/nonblock/read.py +++ b/nonblock/read.py @@ -3,7 +3,7 @@ read.py Contains pure-python functions for non-blocking reads in python - + ''' # vim: ts=4 sw=4 expandtab diff --git a/setup.py b/setup.py index 86be700..5b6da34 100755 --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ long_description = summary setup(name='python-nonblock', - version='3.0.1', + version='4.0.1', packages=['nonblock'], author='Tim Savannah', author_email='kata198@gmail.com', @@ -30,6 +30,7 @@ 'Programming Language :: Python :: 2.7', 'Programming Language :: Python :: 3', 'Programming Language :: Python :: 3.4', + 'Programming Language :: Python :: 3.5', 'Operating System :: POSIX', 'Operating System :: POSIX :: Linux', 'Operating System :: Unix', diff --git a/testWrite.py b/testWrite.py index a832a60..f8e1112 100755 --- a/testWrite.py +++ b/testWrite.py @@ -1,9 +1,11 @@ #!/usr/bin/env python ''' - This testWrite.py file is a testcase for python-nonblock (c) 2015 Tim Savannah. + This testWrite.py file is a testcase for python-nonblock Copyright (c) 2015, 2016, 2017, 2019 Timothy Savannah. The testcase itself is hereby granted access as a Public Domain work, or the closest legally in your area. + This tests the various priority levels of BackgroundWrite and gives output based on results. + You can use this to experiment with different values and priority levels on your hardware and use cases. It will perform some math/memory operations whilst writing 2 files using chains at the same time. @@ -11,10 +13,22 @@ "interactivity score". The interactivity score is the number of calculations divided by the time you spent writing. The higher the ioPrio, the higher the interactivity score should be. It is a measure of how much OTHER work got done while you were performing otherwise blocking I/O operations. + + You may consider having a "normal" background workload going before running this test, so you can guage for yourself + what interactivity level to choose with BackgroundWrite depending on if you want to maximize throughput of your write, + or cpu-bound / other processes. + + There is a "DO_SYNC" option, that if True will cause the drive to flush all data to disk. You may want to play with this on + and off. For example, if you are running a dd if=/dev/urandom of=testfile bs=512 to simulate background I/O on the same device, + it will flush both datasets. This is moreso intended to be set to True when NO background I/O is going. ''' + import os import time import sys +import subprocess +import tempfile + from nonblock import bgwrite, bgwrite_chunk @@ -25,46 +39,158 @@ class dummy(object): pass -# Pick a big file -BIG_FILE = "/usr/lib/libc-2.22.so" +# TODO: Use os.linesep instead of explicit '\n' ? + +# Generate a big file + +# Set to True to sync ALL I/O (this process and others) to disk before each test. +# This should be True if no background I/O is being ran, and maybe True maybe False if some is running. +# It will make tests more "fair" between eachother, but with various background tasks it may introduce +# a large variance and margin of error. +DO_SYNC = True + +def printUsage(): + sys.stderr.write('''Usage: %s (Optional: [start prio] [end prio]) + Runs through priority levels, either specified start/end (inclusive), or if no args provided, the full set 1-9. + +If specified, start prio must be <= end prio, +start prio must be >= 1, +end prio must be <= 9. + +Tests interactivity by performing math during two sets of simultaneous bgwrites +''' %(os.path.basename(__file__), ) +) + sys.stderr.flush() if __name__ == '__main__': - startPrio = 1 - endPrio = 10 + if '--help' in sys.argv: + printUsage() + sys.exit(0) + + if len(sys.argv) not in (1, 3): + sys.stderr.write('Invalid arguments.\n\n') + printUsage() + sys.exit(1) if len(sys.argv) == 3: + ## They have provided a start and end priority + + # Validate they are integers + if not sys.argv[1].isdigit() and not sys.argv[2].isdigit(): + sys.stderr.write('Right number of arguments, but not whole integers.\n\n') + printUsage() + sys.exit(1) + + # Extract priorities to test from arguments startPrio = int(sys.argv[1]) - endPrio = int(sys.argv[2]) + 1 + endPrio = int(sys.argv[2]) + + # Validate these are valid priorities, and in valid order + if startPrio > endPrio: + sys.stderr.write('Start priority is greater than end priority, impossible!\n\n') + printUsage() + sys.exit(1) + + if startPrio < 1: + sys.stderr.write('Start priority must be >= 1\n\n') + printUsage() + sys.exit(1) + + if endPrio > 9: + sys.stderr.write('End priority must be <= 9\n\n') + printUsage() + sys.exit(1) + + else: + + print ( 'No arguments provided, defaulting to testing full set (1-9 inclusive) of IO priorities\n' ) + + startPrio = 1 + endPrio = 9 + + + + ## Generate a fairly large dataset + + bigFile = tempfile.NamedTemporaryFile(mode='wt') + # Write some data + numLetters = ord('z') - ord('a') + lettersLst = [ chr( ord('a') + i ) for i in range(numLetters) ] + lettersStr = ''.join(lettersLst) + curLen = 0 + before = time.time() + # Size of my libc at time of writing + while curLen < 2317344: + bigFile.write(lettersStr) + curLen += numLetters + + # Try to force flush + try: + bigFile.flush() + except: + pass + after = time.time() + + print ( 'Generated %d bytes of data in %.5f seconds.' %( curLen, after - before ) ) + + bigFilename = bigFile.name + + currentDirectory = os.path.abspath( os.path.dirname( __file__ ) ) + baseFilename = os.path.basename( __file__ ) + print ( 'Using containing directory of this file [ %s ] for writes...\n' %( currentDirectory, ) ) + + # Change dir to this directory, so we don't have to use os.sep + os.chdir(currentDirectory) username = os.environ['USER'] # Some values used for the math x = 13 y = 37 - - # Get some big data - with open(BIG_FILE, 'rb') as f: + + # Get some big data - open a fresh copy so we aren't reading from buffer + with open(bigFilename, 'rb') as f: before = time.time() data = f.read() after = time.time() - sys.stdout.write('Time to read: %f\n' %(after - before,)) + print ('Time to read %d bytes: %f\n' %(len(data), after - before,) ) + + print ( 'Running through IO priorities, %d -> %d (inclusive),\n whilst running simultaneous math calculations to test interactivity\n and I/O rate at each level...\n\n%s\n\n' %( startPrio, endPrio, '-' * 50, ) ) # Expand that big data data = data * 50 + dataLen = len(data) + # Iterate through the I/O priorities, do the operation, and show the score. - for ioPrio in range(startPrio, endPrio, 1): + for ioPrio in range(startPrio, endPrio + 1, 1): answers = [22] answers2 = [16] answers3 = [81] - if os.path.exists('/home/%s/nb_test_output1' %(username,)): - os.unlink('/home/%s/nb_test_output1' %(username,)) - if os.path.exists('/home/%s/nb_test_output2' %(username,)): - os.unlink('/home/%s/nb_test_output2' %(username,)) - f = open('/home/%s/nb_test_output1' %(username,), 'wb') - f2 = open('/home/%s/nb_test_output2' %(username,), 'wb') + if os.path.exists('nb_test_output1'): + os.unlink('nb_test_output1') + if os.path.exists('nb_test_output2'): + os.unlink('nb_test_output2') + f = open('nb_test_output1', 'wb') + f2 = open('nb_test_output2', 'wb') + + if DO_SYNC is True: + # Ensure pending data is flushed so next run has fair chance + if sys.version_info.major >= 3: + # python3 supports os.sync + os.sync() + elif sys.platform == 'linux': + # python2 has no sync, but if we are on linux we can try + # the sync command + try: + pipe = subprocess.Popen('sync', shell=True) + pipe.wait() + except: + pass + time.sleep(3) + before = time.time() # Start first chain, final will close f1 @@ -94,15 +220,24 @@ class dummy(object): numAnswers = len(answers) + len(answers2) + len(answers3) - 3 delta = round(after - before, 5) - sys.stdout.write(('-' * 40) + '\n') - sys.stdout.write('[%d] Time to write: %f\n' %(ioPrio, delta) ) - sys.stdout.write('[%d] Number of answers generated: %d\n' %(ioPrio, numAnswers) ) + sys.stdout.write(('-' * 50) + '\n') + sys.stdout.write('[%d] Time to write: %f seconds\n' %(ioPrio, delta) ) + sys.stdout.write('[%d] Number of answers generated during writes: %d\n' %(ioPrio, numAnswers) ) + sys.stdout.write('[%d] Average write speed: %f M/s\n' %(ioPrio, round((dataLen / delta) / (1024.0 * 1024.0), 5) ) ) sys.stdout.write('[%d] Interactivity score: %f\n' % (ioPrio, round(numAnswers / delta, 5)) ) - sys.stdout.write(('=' * 40) + '\n\n') + sys.stdout.write(('=' * 50) + '\n\n') sys.stdout.flush() - if os.path.exists('/home/%s/nb_test_output1' %(username,)): - os.unlink('/home/%s/nb_test_output1' %(username,)) - if os.path.exists('/home/%s/nb_test_output2' %(username,)): - os.unlink('/home/%s/nb_test_output2' %(username,)) + + print ( 'Cleaning-up') + + if os.path.exists('nb_test_output1'): + os.unlink('nb_test_output1') + if os.path.exists('nb_test_output2'): + os.unlink('nb_test_output2') + + # Cleanup the generated data + bigFile.close() + +# vim: set ts=4 st=4 sw=4 expandtab :