Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 7b7f3af

Browse filesBrowse files
authored
Fix sasl gssapi plugin: do not rely on client_ctx.complete in auth_bytes() (#2631)
1 parent 48dd596 commit 7b7f3af
Copy full SHA for 7b7f3af

File tree

Expand file treeCollapse file tree

2 files changed

+54
-5
lines changed
Filter options
Expand file treeCollapse file tree

2 files changed

+54
-5
lines changed

‎kafka/sasl/gssapi.py

Copy file name to clipboardExpand all lines: kafka/sasl/gssapi.py
+12-5Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,15 @@ def __init__(self, **config):
2626
raise ValueError('sasl_kerberos_service_name or sasl_kerberos_name required for GSSAPI sasl configuration')
2727
self._is_done = False
2828
self._is_authenticated = False
29+
self.gssapi_name = None
2930
if config.get('sasl_kerberos_name', None) is not None:
3031
self.auth_id = str(config['sasl_kerberos_name'])
32+
if isinstance(config['sasl_kerberos_name'], gssapi.Name):
33+
self.gssapi_name = config['sasl_kerberos_name']
3134
else:
3235
kerberos_domain_name = config.get('sasl_kerberos_domain_name', '') or config.get('host', '')
3336
self.auth_id = config['sasl_kerberos_service_name'] + '@' + kerberos_domain_name
34-
if isinstance(config.get('sasl_kerberos_name', None), gssapi.Name):
35-
self.gssapi_name = config['sasl_kerberos_name']
36-
else:
37+
if self.gssapi_name is None:
3738
self.gssapi_name = gssapi.Name(self.auth_id, name_type=gssapi.NameType.hostbased_service).canonicalize(gssapi.MechType.kerberos)
3839
self._client_ctx = gssapi.SecurityContext(name=self.gssapi_name, usage='initiate')
3940
self._next_token = self._client_ctx.step(None)
@@ -43,9 +44,8 @@ def auth_bytes(self):
4344
# so mark is_done after the final auth_bytes are provided
4445
# in practice we'll still receive a response when using SaslAuthenticate
4546
# but not when using the prior unframed approach.
46-
if self._client_ctx.complete:
47+
if self._is_authenticated:
4748
self._is_done = True
48-
self._is_authenticated = True
4949
return self._next_token or b''
5050

5151
def receive(self, auth_bytes):
@@ -74,6 +74,13 @@ def receive(self, auth_bytes):
7474
]
7575
# add authorization identity to the response, and GSS-wrap
7676
self._next_token = self._client_ctx.wrap(b''.join(message_parts), False).message
77+
# We need to identify the last token in auth_bytes();
78+
# we can't rely on client_ctx.complete because it becomes True after generating
79+
# the second-to-last token (after calling .step(auth_bytes) for the final time)
80+
# We could introduce an additional state variable (i.e., self._final_token),
81+
# but instead we just set _is_authenticated. Since the plugin interface does
82+
# not read is_authenticated() until after is_done() is True, this should be fine.
83+
self._is_authenticated = True
7784

7885
def is_done(self):
7986
return self._is_done

‎test/sasl/test_gssapi.py

Copy file name to clipboard
+42Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
from __future__ import absolute_import
2+
3+
try:
4+
from unittest import mock
5+
except ImportError:
6+
import mock
7+
8+
from kafka.sasl import get_sasl_mechanism
9+
import kafka.sasl.gssapi
10+
11+
12+
def test_gssapi():
13+
config = {
14+
'sasl_kerberos_domain_name': 'foo',
15+
'sasl_kerberos_service_name': 'bar',
16+
}
17+
client_ctx = mock.Mock()
18+
client_ctx.step.side_effect = [b'init', b'exchange', b'complete', b'xxxx']
19+
client_ctx.complete = False
20+
def mocked_message_wrapper(msg, *args):
21+
wrapped = mock.Mock()
22+
type(wrapped).message = mock.PropertyMock(return_value=msg)
23+
return wrapped
24+
client_ctx.unwrap.side_effect = mocked_message_wrapper
25+
client_ctx.wrap.side_effect = mocked_message_wrapper
26+
kafka.sasl.gssapi.gssapi = mock.Mock()
27+
kafka.sasl.gssapi.gssapi.SecurityContext.return_value = client_ctx
28+
gssapi = get_sasl_mechanism('GSSAPI')(**config)
29+
assert isinstance(gssapi, kafka.sasl.gssapi.SaslMechanismGSSAPI)
30+
client_ctx.step.assert_called_with(None)
31+
32+
while not gssapi.is_done():
33+
send_token = gssapi.auth_bytes()
34+
receive_token = send_token # not realistic, but enough for testing
35+
if send_token == b'\x00cbar@foo': # final wrapped message
36+
receive_token = b'' # final message gets an empty response
37+
gssapi.receive(receive_token)
38+
if client_ctx.step.call_count == 3:
39+
client_ctx.complete = True
40+
41+
assert gssapi.is_done()
42+
assert gssapi.is_authenticated()

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.