Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Latest commit

 

History

History
History
144 lines (97 loc) · 3.07 KB

File metadata and controls

144 lines (97 loc) · 3.07 KB
Copy raw file
Download raw file
Open symbols panel
Edit and raw actions
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import threading
import string
import random
import six
from copy import copy
from pubnub import utils
from pubnub.crypto import PubNubCryptodome
from pubnub.pnconfiguration import PNConfiguration
try:
from mock import patch
except ImportError:
from unittest.mock import patch # noqa: F401
crypto = PubNubCryptodome()
pub_key = "pub-c-739aa0fc-3ed5-472b-af26-aca1b333ec52"
sub_key = "sub-c-33f55052-190b-11e6-bfbc-02ee2ddab7fe"
pub_key_pam = "pub-c-98863562-19a6-4760-bf0b-d537d1f5c582"
sub_key_pam = "sub-c-7ba2ac4c-4836-11e6-85a4-0619f8945a4f"
sec_key_pam = "sec-c-MGFkMjQxYjMtNTUxZC00YzE3LWFiZGYtNzUwMjdjNmM3NDhk"
pnconf = PNConfiguration()
pnconf.publish_key = pub_key
pnconf.subscribe_key = sub_key
pnconf.enable_subscribe = False
pnconf_sub = PNConfiguration()
pnconf_sub.publish_key = pub_key
pnconf_sub.subscribe_key = sub_key
pnconf_enc = PNConfiguration()
pnconf_enc.publish_key = pub_key
pnconf_enc.subscribe_key = sub_key
pnconf_enc.cipher_key = "testKey"
pnconf_enc.enable_subscribe = False
pnconf_enc_sub = PNConfiguration()
pnconf_enc_sub.publish_key = pub_key
pnconf_enc_sub.subscribe_key = sub_key
pnconf_enc_sub.cipher_key = "testKey"
pnconf_pam = PNConfiguration()
pnconf_pam.publish_key = pub_key_pam
pnconf_pam.subscribe_key = sub_key_pam
pnconf_pam.secret_key = sec_key_pam
pnconf_pam.enable_subscribe = False
pnconf_ssl = PNConfiguration()
pnconf_ssl.publish_key = pub_key
pnconf_ssl.subscribe_key = sub_key
pnconf_ssl.ssl = True
def pnconf_copy():
return copy(pnconf)
def pnconf_enc_copy():
return copy(pnconf_enc)
def pnconf_sub_copy():
return copy(pnconf_sub)
def pnconf_enc_sub_copy():
return copy(pnconf_enc_sub)
def pnconf_pam_copy():
return copy(pnconf_pam)
def pnconf_ssl_copy():
return copy(pnconf_ssl)
sdk_name = "Python-UnitTest"
def url_encode(data):
return utils.url_encode(utils.write_value_as_string(data))
def url_decode(data):
return six.moves.urllib.parse.unquote(data)
def gen_channel(prefix):
return "%s-%s" % (prefix, gen_string(8))
def gen_string(l):
return ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(l))
def gen_decrypt_func(cipher_key):
def decrypter(entry):
mr = crypto.decrypt(cipher_key, entry)
return mr
return decrypter
class CountDownLatch(object):
def __init__(self, count=1):
self.count = count
self.lock = threading.Condition()
self.done = False
self.t = None
def count_down(self):
self.lock.acquire()
self.count -= 1
if self.count <= 0:
self.done = True
self.lock.notifyAll()
if self.t is not None:
self.t.cancel()
self.lock.release()
def _release(self):
self.lock.acquire()
self.count = 0
self.lock.notifyAll()
self.lock.release()
def await(self, timeout=5):
self.lock.acquire()
self.t = threading.Timer(timeout, self._release)
self.t.start()
while self.count > 0:
self.lock.wait()
self.t.cancel()
self.lock.release()
Morty Proxy This is a proxified and sanitized view of the page, visit original site.