Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Latest commit

 

History

History
History
169 lines (130 loc) · 5 KB

File metadata and controls

169 lines (130 loc) · 5 KB
Copy raw file
Download raw file
Open symbols panel
Edit and raw actions
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
"""
A module dedicated to working with local folders/files.
"""
import os
from copy import copy
import fnmatch
from itertools import chain
import hashlib
from .utils import logger
from .constants import (IGNORES_DEFAULT, IGNORE_FILES_NAMES, MAX_BUCKET_SIZE)
def prepare_file_path(filepath):
"""
1. Get relative path
2. Modify Windows path
3. Prefix with /
"""
relpath = os.path.relpath(filepath).replace('\\', '/')
return '/{}'.format(relpath)
def resolve_file_path(bundle_filepath):
""" Reversive function to prepare_file_path """
path = bundle_filepath[1:]
if os.name != 'posix':
path = path.replace('/', '\\')
return os.path.abspath(path)
def get_file_content(file_path):
with open(file_path, encoding='utf-8', mode='r') as f:
return f.read()
def parse_file_ignores(file_path):
dirname = os.path.dirname(file_path)
with open(file_path, encoding='utf-8', mode='r') as f:
for l in f.readlines():
rule = l.strip()
if rule and not rule.startswith('#'):
yield os.path.join(dirname, rule)
if not rule.startswith('/'):
yield os.path.join(dirname, '**', rule)
def is_ignored(path, file_ignores):
for i in file_ignores:
if fnmatch.fnmatch(path, i):
logger.debug('pattern: {} | ignored: {}'.format(i, path))
return True
return False
def collect_bundle_files(paths, file_filter, file_ignores=IGNORES_DEFAULT):
local_file_ignores = copy(file_ignores)
for path in paths:
with os.scandir(path) as it:
local_files = []
sub_dirs = []
local_ignore_file = False
for entry in it:
if entry.is_symlink():
# To prevent possible infinite loops, we ignore symlinks for now
continue
if entry.is_dir():
sub_dirs.append(entry.path)
continue
if entry.name in IGNORE_FILES_NAMES:
for ignore_rule in parse_file_ignores(entry.path):
local_file_ignores.add(ignore_rule)
local_ignore_file = True
logger.debug('recognized ignore rules in file --> {}'.format(entry.path))
continue
if entry.is_file() \
and file_filter(entry.name) \
and not is_ignored(entry.path, local_file_ignores):
local_files.append(entry.path)
if local_ignore_file:
local_files = [p for p in local_files if not is_ignored(p, local_file_ignores)]
yield from local_files
sub_dirs = [
subdir for subdir in sub_dirs
if not is_ignored(subdir, local_file_ignores)
]
yield from collect_bundle_files(sub_dirs, file_filter, local_file_ignores)
def get_file_meta(file_path):
content = get_file_content(file_path)
hasher = hashlib.sha256()
hasher.update(content.encode('utf-8'))
return (len(content), hasher.hexdigest())
def prepare_bundle_hashes(bundle_files, bucket_size=MAX_BUCKET_SIZE):
items = []
for file_path in bundle_files:
try:
file_size, file_hash = get_file_meta(file_path)
except UnicodeDecodeError:
logger.debug('ecxluded a file --> {} (Unicode Decode Error)'.format(file_path))
else:
if file_size < bucket_size:
items.append((file_path, file_hash))
return items
def compose_file_buckets(missing_files, bucket_size=MAX_BUCKET_SIZE):
"""
Splits files into buckets with limiting max size
Returns list of items: (path, hash)
"""
buckets = [{
'size': bucket_size,
'files': []
}]
def route_file_to_bucket(raw_file_path):
file_path = resolve_file_path(raw_file_path)
# Get file details
file_size, file_hash = get_file_meta(file_path)
# Check that file does not exceed max bucket size
if file_size > bucket_size:
logger.debug('excluded big file --> {} ({} bytes)'.format(file_path, file_size))
return
# Try to find existing bucket
for bucket in buckets:
if bucket['size'] >= file_size:
bucket['files'].append( (file_path, file_hash) )
bucket['size'] -= file_size
return bucket
bucket = {
'size': bucket_size - file_size,
'files': [ (file_path, file_hash) ]
}
buckets.append(bucket)
return bucket
for raw_file_path in missing_files:
bucket = route_file_to_bucket(raw_file_path)
if not bucket:
continue
if bucket['size'] < bucket_size * 0.01:
yield bucket['files'] # Give bucket to requester
buckets.remove(bucket) # Remove it as fullfilled
# Send all left-over buckets
for bucket in buckets:
if bucket['files']:
yield bucket['files']
Morty Proxy This is a proxified and sanitized view of the page, visit original site.