Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Latest commit

 

History

History
History
64 lines (49 loc) · 1.87 KB

File metadata and controls

64 lines (49 loc) · 1.87 KB
Copy raw file
Download raw file
Open symbols panel
Edit and raw actions
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors
#
# This module is part of GitDB and is released under
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
"""Module with examples from the tutorial section of the docs"""
from lib import TestBase, fixture_path
from git.base import IStream
from git.db.py.loose import PureLooseObjectODB
from git.util import pool
from cStringIO import StringIO
from async import IteratorReader
class TestExamples(TestBase):
def test_base(self):
ldb = PureLooseObjectODB(fixture_path("../../../.git/objects"))
for sha1 in ldb.sha_iter():
oinfo = ldb.info(sha1)
ostream = ldb.stream(sha1)
assert oinfo[:3] == ostream[:3]
assert len(ostream.read()) == ostream.size
assert ldb.has_object(oinfo.binsha)
# END for each sha in database
# assure we close all files
try:
del(ostream)
del(oinfo)
except UnboundLocalError:
pass
# END ignore exception if there are no loose objects
data = "my data"
istream = IStream("blob", len(data), StringIO(data))
# the object does not yet have a sha
assert istream.binsha is None
ldb.store(istream)
# now the sha is set
assert len(istream.binsha) == 20
assert ldb.has_object(istream.binsha)
# async operation
# Create a reader from an iterator
reader = IteratorReader(ldb.sha_iter())
# get reader for object streams
info_reader = ldb.stream_async(reader)
# read one
info = info_reader.read(1)[0]
# read all the rest until depletion
ostreams = info_reader.read()
# set the pool to use two threads
pool.set_size(2)
# synchronize the mode of operation
pool.set_size(0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.