Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Latest commit

 

History

History
History
120 lines (94 loc) · 3.72 KB

File metadata and controls

120 lines (94 loc) · 3.72 KB
Copy raw file
Download raw file
Open symbols panel
Edit and raw actions
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# test_utils.py
# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors
#
# This module is part of GitPython and is released under
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
import os
import tempfile
from test.testlib import *
from git.utils import *
from git import *
from git.cmd import dashify
import time
class TestUtils(TestCase):
def setup(self):
self.testdict = {
"string": "42",
"int": 42,
"array": [ 42 ],
}
def test_it_should_dashify(self):
assert_equal('this-is-my-argument', dashify('this_is_my_argument'))
assert_equal('foo', dashify('foo'))
def test_lock_file(self):
my_file = tempfile.mktemp()
lock_file = LockFile(my_file)
assert not lock_file._has_lock()
# release lock we don't have - fine
lock_file._release_lock()
# get lock
lock_file._obtain_lock_or_raise()
assert lock_file._has_lock()
# concurrent access
other_lock_file = LockFile(my_file)
assert not other_lock_file._has_lock()
self.failUnlessRaises(IOError, other_lock_file._obtain_lock_or_raise)
lock_file._release_lock()
assert not lock_file._has_lock()
other_lock_file._obtain_lock_or_raise()
self.failUnlessRaises(IOError, lock_file._obtain_lock_or_raise)
# auto-release on destruction
del(other_lock_file)
lock_file._obtain_lock_or_raise()
lock_file._release_lock()
def test_blocking_lock_file(self):
my_file = tempfile.mktemp()
lock_file = BlockingLockFile(my_file)
lock_file._obtain_lock()
# next one waits for the lock
start = time.time()
wait_time = 0.1
wait_lock = BlockingLockFile(my_file, 0.05, wait_time)
self.failUnlessRaises(IOError, wait_lock._obtain_lock)
elapsed = time.time() - start
assert elapsed <= wait_time + 0.02 # some extra time it may cost
def _cmp_contents(self, file_path, data):
# raise if data from file at file_path
# does not match data string
fp = open(file_path, "rb")
try:
assert fp.read() == data
finally:
fp.close()
def test_safe_operation(self):
my_file = tempfile.mktemp()
orig_data = "hello"
new_data = "world"
my_file_fp = open(my_file, "wb")
my_file_fp.write(orig_data)
my_file_fp.close()
try:
cwrite = ConcurrentWriteOperation(my_file)
# didn't start writing, doesnt matter
cwrite._end_writing(False)
cwrite._end_writing(True)
assert not cwrite._is_writing()
# write data and fail
stream = cwrite._begin_writing()
assert cwrite._is_writing()
stream.write(new_data)
cwrite._end_writing(successful=False)
self._cmp_contents(my_file, orig_data)
assert not os.path.exists(stream.name)
# write data - concurrently
ocwrite = ConcurrentWriteOperation(my_file)
stream = cwrite._begin_writing()
self.failUnlessRaises(IOError, ocwrite._begin_writing)
stream.write("world")
cwrite._end_writing(successful=True)
self._cmp_contents(my_file, new_data)
assert not os.path.exists(stream.name)
# could test automatic _end_writing on destruction
finally:
os.remove(my_file)
# END final cleanup
Morty Proxy This is a proxified and sanitized view of the page, visit original site.