Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
This repository was archived by the owner on Apr 15, 2024. It is now read-only.
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions 123 consul/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@
log = logging.getLogger(__name__)


def args_to_payload(args_map):
return [
(key, value)
for key, value in
{
k: v for
k, v in args_map.items() if v is not None and k != "self"
}.items()
]

class ConsulException(Exception):
pass

Expand Down Expand Up @@ -1933,6 +1943,119 @@ class ACL(object):
def __init__(self, agent):
self.agent = agent

def create_token(self, accessor_id=None, secret_id=None, description=None, policies=None, roles=None, service_identities=None, local=None, expiration_time=None, expiration_ttl=None):
return self.agent.http.put(
CB.json(), '/v1/acl/token', params=args_to_payload(locals()))

def read_token(self, accessor_id):
return self.agent.http.get(
CB.json(), '/v1/acl/token/{}'.format(accessor_id))

def read_self_token(self):
return self.agent.http.get(
CB.json(), '/v1/acl/token/self')

def update_token(self, accessor_id, secret_id=None, description=None, policies=None, roles=None, service_identities=None, local=None, expiration_time=None, expiration_ttl=None):
return self.agent.http.put(
CB.json(), '/v1/acl/token/{}'.format(accessor_id), params=args_to_payload(locals()))

def clone_token(self, accessor_id, description=None):
return self.agent.http.put(
CB.json(), '/v1/acl/token/{}/clone'.format(accessor_id),
params=[("description", description)] if description != "" else [])

def delete_token(self, accessor_id):
return self.agent.http.delete(
CB.json(), '/v1/acl/token/{}'.format(accessor_id))

def list_tokens(self):
return self.agent.http.get(
CB.json(), '/v1/acl/tokens')

def create_policy(self, name, description=None, rules=None, datacenters=None):
return self.agent.http.put(
CB.json(), '/v1/acl/policy', params=args_to_payload(locals()))

def read_policy(self, policy_id):
return self.agent.http.get(
CB.json(), '/v1/acl/policy/{}'.format(policy_id))

def update_policy(self, policy_id, name, description=None, rules=None, datacenters=None):
return self.agent.http.put(
CB.json(), '/v1/acl/policy/{}'.format(policy_id), args_to_payload(locals()))

def delete_policy(self, policy_id):
return self.agent.http.delete(
CB.json(), '/v1/acl/policy/{}'.format(policy_id))

def list_policies(self):
return self.agent.http.get(
CB.json(), '/v1/acl/policies')

def create_role(self, name, description=None, policies=None, service_identities=None):
return self.agent.http.put(
CB.json(), '/v1/acl/role', params=args_to_payload(locals()))

def read_role(self, role_id):
return self.agent.http.get(
CB.json(), '/v1/acl/role/{}'.format(role_id))

def read_role_by_name(self, role_name):
return self.agent.http.get(
CB.json(), '/v1/acl/role/name/{}'.format(role_name))

def update_role(self, role_id, name, description=None, policies=None, service_identities=None):
return self.agent.http.put(
CB.json(), '/v1/acl/role/{}'.format(role_id), args_to_payload(locals()))

def delete_role(self, role_id):
return self.agent.http.delete(
CB.json(), '/v1/acl/role/{}'.format(role_id))

def list_roles(self, policy_id=None):
return self.agent.http.get(
CB.json(), '/v1/acl/roles', params=[("policy_id", policy_id)] if policy_id != "" else [])

def create_auth_method(self, auth_method_name, auth_method_type, config, description=None):
return self.agent.http.put(
CB.json(), '/v1/acl/auth-method', params=args_to_payload(locals()))

def read_auth_method(self, auth_method_name):
return self.agent.http.put(
CB.json(), '/v1/acl/auth-method/{}'.format(auth_method_name))

def update_auth_method(self, auth_method_name, auth_method_type, config, description=None):
return self.agent.http.put(
CB.json(), '/v1/acl/auth-method/{}'.format(auth_method_name), args_to_payload(locals()))

def delete_auth_method(self, auth_method_name):
return self.agent.http.delete(
CB.json(), '/v1/acl/auth-method/{}'.format(auth_method_name))

def list_auth_methods(self):
return self.agent.http.get(
CB.json(), '/v1/acl/auth-methods')

def create_binding_rule(self, binding_rule, bind_type, bind_name, description=None, selector=None):
return self.agent.http.put(
CB.json(), '/v1/acl/binding-rule', params=args_to_payload(locals()))

def read_binding_rule(self, binding_rule_id):
return self.agent.http.put(
CB.json(), '/v1/acl/binding-rule/{}'.format(binding_rule_id))

def update_binding_rule(self, binding_rule_id, auth_method, bind_type, bind_name, description=None, selector=None):
return self.agent.http.put(
CB.json(), '/v1/acl/binding-rule/{}'.format(binding_rule_id), args_to_payload(locals()))

def delete_binding_rule(self, binding_rule_id):
return self.agent.http.delete(
CB.json(), '/v1/acl/binding-rule/{}'.format(binding_rule_id))

def list_binding_rules(self):
return self.agent.http.get(
CB.json(), '/v1/acl/binding-rules')

def list(self, token=None):
"""
Lists all the active ACL tokens. This is a privileged endpoint, and
Expand Down
Morty Proxy This is a proxified and sanitized view of the page, visit original site.