From 97a63280316dd7b4d22416a2408152089a8961cf Mon Sep 17 00:00:00 2001 From: Christian Heimes Date: Fri, 20 Nov 2020 12:38:53 +0100 Subject: [PATCH] bpo-18369: Add certificate and private key types Signed-off-by: Christian Heimes --- Lib/ssl.py | 2 +- Lib/test/test_ssl.py | 18 + .../2020-11-19-09-52-24.bpo-18369.qzvYH2.rst | 2 + Modules/_ssl.c | 217 +++++- Modules/_ssl.h | 101 ++- Modules/_ssl/cert.c | 715 ++++++++++++++++++ Modules/_ssl/clinic/cert.c.h | 525 +++++++++++++ Modules/_ssl/clinic/pkey.c.h | 115 +++ Modules/_ssl/clinic/truststore.c.h | 0 Modules/_ssl/misc.c | 83 ++ Modules/_ssl/pkey.c | 161 ++++ Modules/_ssl/truststore.c | 95 +++ Modules/clinic/_ssl.c.h | 27 +- setup.py | 10 +- 14 files changed, 2057 insertions(+), 14 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2020-11-19-09-52-24.bpo-18369.qzvYH2.rst create mode 100644 Modules/_ssl/cert.c create mode 100644 Modules/_ssl/clinic/cert.c.h create mode 100644 Modules/_ssl/clinic/pkey.c.h create mode 100644 Modules/_ssl/clinic/truststore.c.h create mode 100644 Modules/_ssl/misc.c create mode 100644 Modules/_ssl/pkey.c create mode 100644 Modules/_ssl/truststore.c diff --git a/Lib/ssl.py b/Lib/ssl.py index 9c1ba581dd66f5..12908761dce2d4 100644 --- a/Lib/ssl.py +++ b/Lib/ssl.py @@ -98,7 +98,7 @@ import _ssl # if we can't import it, let the error propagate from _ssl import OPENSSL_VERSION_NUMBER, OPENSSL_VERSION_INFO, OPENSSL_VERSION -from _ssl import _SSLContext, MemoryBIO, SSLSession +from _ssl import _SSLContext, MemoryBIO, SSLSession, Certificate, PrivateKey from _ssl import ( SSLError, SSLZeroReturnError, SSLWantReadError, SSLWantWriteError, SSLSyscallError, SSLEOFError, SSLCertVerificationError diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py index 3ad14c63968e64..8f0f7f9ee4bcbf 100644 --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -1410,6 +1410,24 @@ def getpass(self): # Make sure the password function isn't called if it isn't needed ctx.load_cert_chain(CERTFILE, password=getpass_exception) + def test_load_cert_privkey(self): + chain = ssl.Certificate.chain_from_file(ONLYCERT) + self.assertEqual(len(chain), 1) + cas = ssl.Certificate.bundle_from_file(SIGNING_CA) + self.assertEqual(len(cas), 1) + pkey = ssl.PrivateKey.from_file(ONLYKEY) + + ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) + ctx.load_cert_chain(chain, pkey) + ctx.load_verify_locations(cadata=cas) + self.assertEqual(len(ctx.get_ca_certs()), 1) + + pkey = ssl.PrivateKey.from_file( + ONLYKEY_PROTECTED, password=KEY_PASSWORD + ) + ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) + ctx.load_cert_chain(chain, pkey) + def test_load_verify_locations(self): ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) ctx.load_verify_locations(CERTFILE) diff --git a/Misc/NEWS.d/next/Library/2020-11-19-09-52-24.bpo-18369.qzvYH2.rst b/Misc/NEWS.d/next/Library/2020-11-19-09-52-24.bpo-18369.qzvYH2.rst new file mode 100644 index 00000000000000..1b97afbd2c40ff --- /dev/null +++ b/Misc/NEWS.d/next/Library/2020-11-19-09-52-24.bpo-18369.qzvYH2.rst @@ -0,0 +1,2 @@ +Certificate and PrivateKey classes were added to the ssl module. +Certificates and keys can now be loaded from memory buffer, too. diff --git a/Modules/_ssl.c b/Modules/_ssl.c index 376d3bb11a40bb..bda11bb9f84fc8 100644 --- a/Modules/_ssl.c +++ b/Modules/_ssl.c @@ -321,6 +321,25 @@ typedef struct { PySSLContext *ctx; } PySSLSession; +<<<<<<< HEAD +||||||| parent of 27501a5ce86 (Work on trust store) +static PyTypeObject *PySSLContext_Type; +static PyTypeObject *PySSLSocket_Type; +static PyTypeObject *PySSLMemoryBIO_Type; +static PyTypeObject *PySSLSession_Type; +static PyTypeObject *PySSLPrivateKey_Type; +static PyTypeObject *PySSLCertificate_Type; + +======= +static PyTypeObject *PySSLContext_Type; +static PyTypeObject *PySSLSocket_Type; +static PyTypeObject *PySSLMemoryBIO_Type; +static PyTypeObject *PySSLSession_Type; +static PyTypeObject *PySSLPrivateKey_Type; +static PyTypeObject *PySSLCertificate_Type; +static PyTypeObject *PySSLTrustStore_Type; + +>>>>>>> 27501a5ce86 (Work on trust store) static inline _PySSLError _PySSL_errno(int failed, const SSL *ssl, int retcode) { _PySSLError err = { 0 }; @@ -1692,6 +1711,11 @@ _certificate_to_der(_sslmodulestate *state, X509 *certificate) return retval; } +#include "_ssl/misc.c" +#include "_ssl/cert.c" +#include "_ssl/pkey.c" +#include "_ssl/truststore.c" + /*[clinic input] _ssl._test_decode_cert path: object(converter="PyUnicode_FSConverter") @@ -1784,6 +1808,26 @@ _ssl__SSLSocket_getpeercert_impl(PySSLSocket *self, int binary_mode) return result; } +#if OPENSSL_VERSION_1_1 +/*[clinic input] +_ssl._SSLSocket.verified_chain + +[clinic start generated code]*/ + +static PyObject * +_ssl__SSLSocket_verified_chain_impl(PySSLSocket *self) +/*[clinic end generated code: output=3068b09c06b0a1eb input=d49eda1fa8963989]*/ +{ + STACK_OF(X509) *chain; + /* borrowed reference */ + chain = SSL_get0_verified_chain(self->ssl); + if (chain == NULL) { + Py_RETURN_NONE; + } + return _PySSL_CertificateFromX509Stack(chain, 1); +} +#endif + static PyObject * cipher_to_tuple(const SSL_CIPHER *cipher) { @@ -2799,6 +2843,7 @@ static PyMethodDef PySSLMethods[] = { _SSL__SSLSOCKET_COMPRESSION_METHODDEF _SSL__SSLSOCKET_SHUTDOWN_METHODDEF _SSL__SSLSOCKET_VERIFY_CLIENT_POST_HANDSHAKE_METHODDEF + _SSL__SSLSOCKET_VERIFIED_CHAIN_METHODDEF {NULL, NULL} }; @@ -3521,9 +3566,9 @@ typedef struct { int error; } _PySSLPasswordInfo; -static int -_pwinfo_set(_PySSLPasswordInfo *pw_info, PyObject* password, - const char *bad_type_error) +int +PySSL_pwinfo_set(PySSLPasswordInfo *pw_info, PyObject* password, + const char *bad_type_error) { /* Set the password and size fields of a _PySSLPasswordInfo struct from a unicode, bytes, or byte array object. @@ -3575,13 +3620,14 @@ _pwinfo_set(_PySSLPasswordInfo *pw_info, PyObject* password, return 0; } -static int -_password_callback(char *buf, int size, int rwflag, void *userdata) +int +PySSL_password_cb(char *buf, int size, int rwflag, void *userdata) { - _PySSLPasswordInfo *pw_info = (_PySSLPasswordInfo*) userdata; + PySSLPasswordInfo *pw_info = (PySSLPasswordInfo*) userdata; PyObject *fn_ret = NULL; PySSL_END_ALLOW_THREADS_S(pw_info->thread_state); + assert(pw_info); if (pw_info->error) { /* already failed previously. OpenSSL 3.0.0-alpha14 invokes the @@ -3598,7 +3644,7 @@ _password_callback(char *buf, int size, int rwflag, void *userdata) goto error; } - if (!_pwinfo_set(pw_info, fn_ret, + if (!PySSL_pwinfo_set(pw_info, fn_ret, "password callback must return a string")) { goto error; } @@ -3622,6 +3668,105 @@ _password_callback(char *buf, int size, int rwflag, void *userdata) return -1; } + +static PyObject * +_load_cert_privkey(PySSLContext *self, PyObject *certs, PyObject *key) +{ + PyObject *seq = NULL; + Py_ssize_t i; + int ret; + + /* Let's start picky and only accept tuples */ + if (!PyTuple_Check(certs)) { + PyErr_Format( + PyExc_TypeError, + "Expected a tuple of Certificates, got '%.200s'.", + Py_TYPE(certs)->tp_name + ); + return NULL; + } + if (Py_TYPE(key) != PySSLPrivateKey_Type) { + PyErr_Format( + PyExc_TypeError, + "Expected PrivateKey, got '%.200s'.", + Py_TYPE(key)->tp_name + ); + return NULL; + } + + seq = PySequence_Fast(certs, "expected a tuple of Certificates"); + if (seq == NULL) + return NULL; + if (PySequence_Fast_GET_SIZE(seq) == 0) { + PyErr_SetString(PyExc_ValueError, "cert list is empty"); + goto error; + } + + /* validate certs */ + for (i = 0; i < PySequence_Fast_GET_SIZE(seq); i++) { + PyObject *ob = PySequence_Fast_GET_ITEM(seq, i); + if (ob == NULL) { + goto error; + } + if (Py_TYPE(ob) != PySSLCertificate_Type) { + PyErr_Format( + PyExc_TypeError, + "Expected Certificate, got '%.200s'.", + Py_TYPE(ob)->tp_name + ); + goto error; + } + } + /* add certs */ + for (i = 0; i < PySequence_Fast_GET_SIZE(seq); i++) { + /* borrowed reference */ + PySSLCertificate *cert = (PySSLCertificate *)PySequence_Fast_GET_ITEM(seq, i); + if (cert == NULL) { + goto error; + } + if (i == 0) { + PySSL_BEGIN_ALLOW_THREADS + ret = SSL_CTX_use_certificate(self->ctx, cert->cert); + PySSL_END_ALLOW_THREADS + if (ERR_peek_error() != 0) { + _setSSLError(NULL, 0, __FILE__, __LINE__); + goto error; + } + ret = SSL_CTX_clear_chain_certs(self->ctx); + if (ret == 0) { + _setSSLError(NULL, 0, __FILE__, __LINE__); + goto error; + } + } + else { + /* certs 1..n are chain certs */ + PySSL_BEGIN_ALLOW_THREADS + ret = SSL_CTX_add1_chain_cert(self->ctx, cert->cert); + PySSL_END_ALLOW_THREADS + if (ret == 0) { + _setSSLError(NULL, 0, __FILE__, __LINE__); + goto error; + } + } + } + /* add and verify private key */ + PySSL_BEGIN_ALLOW_THREADS + ret = SSL_CTX_use_PrivateKey(self->ctx, ((PySSLPrivateKey*)key)->pkey); + if (ret == 1) { + /* OK, now match key to cert */ + ret = SSL_CTX_check_private_key(self->ctx); + } + PySSL_END_ALLOW_THREADS + if (ret != 1) { + _setSSLError(NULL, 0, __FILE__, __LINE__); + goto error; + } + Py_RETURN_NONE; + error: + Py_DECREF(seq); + return NULL; +} + /*[clinic input] _ssl._SSLContext.load_cert_chain certfile: object @@ -3638,13 +3783,21 @@ _ssl__SSLContext_load_cert_chain_impl(PySSLContext *self, PyObject *certfile, PyObject *certfile_bytes = NULL, *keyfile_bytes = NULL; pem_password_cb *orig_passwd_cb = SSL_CTX_get_default_passwd_cb(self->ctx); void *orig_passwd_userdata = SSL_CTX_get_default_passwd_cb_userdata(self->ctx); - _PySSLPasswordInfo pw_info = { NULL, NULL, NULL, 0, 0 }; + PySSLPasswordInfo pw_info = { NULL, NULL, NULL, 0, 0 }; int r; errno = 0; ERR_clear_error(); if (keyfile == Py_None) keyfile = NULL; + if ((keyfile != NULL) && (Py_TYPE(keyfile) == PySSLPrivateKey_Type)) { + if (password && password != Py_None) { + PyErr_SetString(PyExc_ValueError, + "Cannot use password arg with PrivateKey"); + return NULL; + } + return _load_cert_privkey(self, certfile, keyfile); + } if (!PyUnicode_FSConverter(certfile, &certfile_bytes)) { if (PyErr_ExceptionMatches(PyExc_TypeError)) { PyErr_SetString(PyExc_TypeError, @@ -3662,11 +3815,11 @@ _ssl__SSLContext_load_cert_chain_impl(PySSLContext *self, PyObject *certfile, if (password != Py_None) { if (PyCallable_Check(password)) { pw_info.callable = password; - } else if (!_pwinfo_set(&pw_info, password, + } else if (!PySSL_pwinfo_set(&pw_info, password, "password should be a string or callable")) { goto error; } - SSL_CTX_set_default_passwd_cb(self->ctx, _password_callback); + SSL_CTX_set_default_passwd_cb(self->ctx, PySSL_password_cb); SSL_CTX_set_default_passwd_cb_userdata(self->ctx, &pw_info); } PySSL_BEGIN_ALLOW_THREADS_S(pw_info.thread_state); @@ -3831,6 +3984,7 @@ _ssl__SSLContext_load_verify_locations_impl(PySSLContext *self, /*[clinic end generated code: output=454c7e41230ca551 input=42ecfe258233e194]*/ { PyObject *cafile_bytes = NULL, *capath_bytes = NULL; + PyObject *seq = NULL; const char *cafile_buf = NULL, *capath_buf = NULL; int r = 0, ok = 1; @@ -3899,6 +4053,48 @@ _ssl__SSLContext_load_verify_locations_impl(PySSLContext *self, goto error; } } + else if (PyList_Check(cadata)) { + /* List of Certificates instances */ + X509_STORE *store; + Py_ssize_t i; + + seq = PySequence_Fast(cadata, "expected a tuple of Certificates"); + if (seq == NULL) + goto error; + if (PySequence_Fast_GET_SIZE(seq) == 0) { + PyErr_SetString(PyExc_ValueError, + "cadata certificate list is empty"); + goto error; + } + store = SSL_CTX_get_cert_store(self->ctx); + /* validate and add certs */ + for (i = 0; i < PySequence_Fast_GET_SIZE(seq); i++) { + PySSLCertificate *ob = (PySSLCertificate *)PySequence_Fast_GET_ITEM(seq, i); + if (ob == NULL) { + goto error; + } + if (Py_TYPE(ob) != PySSLCertificate_Type) { + PyErr_Format( + PyExc_TypeError, + "Expected Certificate in cadata, got '%.200s'.", + Py_TYPE(ob)->tp_name + ); + goto error; + } + r = X509_STORE_add_cert(store, ob->cert); + if (!r) { + int err = ERR_peek_last_error(); + if ((ERR_GET_LIB(err) == ERR_LIB_X509) && + (ERR_GET_REASON(err) == X509_R_CERT_ALREADY_IN_HASH_TABLE)) { + /* cert already in hash table, not an error */ + ERR_clear_error(); + } else { + _setSSLError(NULL, 0, __FILE__, __LINE__); + goto error; + } + } + } + } else { invalid_cadata: PyErr_SetString(PyExc_TypeError, @@ -3935,6 +4131,7 @@ _ssl__SSLContext_load_verify_locations_impl(PySSLContext *self, end: Py_XDECREF(cafile_bytes); Py_XDECREF(capath_bytes); + Py_XDECREF(seq); if (ok) { Py_RETURN_NONE; } else { diff --git a/Modules/_ssl.h b/Modules/_ssl.h index 3fd517b6d3683f..74ace20493c1c0 100644 --- a/Modules/_ssl.h +++ b/Modules/_ssl.h @@ -1,6 +1,10 @@ #ifndef Py_SSL_H #define Py_SSL_H +/* OpenSSL header files */ +#include "openssl/evp.h" +#include "openssl/x509.h" + /* * ssl module state */ @@ -10,6 +14,8 @@ typedef struct { PyTypeObject *PySSLSocket_Type; PyTypeObject *PySSLMemoryBIO_Type; PyTypeObject *PySSLSession_Type; + PyTypeObject *PySSLPrivateKey_Type; + PyTypeObject *PySSLCertificate_Type; /* SSL error object */ PyObject *PySSLErrorObject; PyObject *PySSLCertVerificationErrorObject; @@ -40,6 +46,99 @@ get_ssl_state(PyObject *module) (get_ssl_state(_PyType_GetModuleByDef(type, &_sslmodule_def))) #define get_state_ctx(c) (((PySSLContext *)(c))->state) #define get_state_sock(s) (((PySSLSocket *)(s))->ctx->state) -#define get_state_mbio(b) ((_sslmodulestate *)PyType_GetModuleState(Py_TYPE(b))) +#define get_state_obj(o) ((_sslmodulestate *)PyType_GetModuleState(Py_TYPE(o))) +#define get_state_mbio(b) get_state_obj(b) +#define get_state_cert(c) get_state_obj(c) +#define get_state_pkey(p) get_state_obj(p) + +enum py_ssl_filetype { + PY_SSL_FILETYPE_PEM=X509_FILETYPE_PEM, + PY_SSL_FILETYPE_ASN1=X509_FILETYPE_ASN1, + PY_SSL_FILETYPE_PEM_AUX=X509_FILETYPE_PEM + 0x100, +}; + +typedef struct { + PyObject_HEAD + EVP_PKEY *pkey; +} PySSLPrivateKey; + +typedef struct { + PyObject_HEAD + X509 *cert; + Py_hash_t hash; +} PySSLCertificate; + +typedef struct { + PyObject_HEAD + X509_STORE *store; + /* OpenSSL 1.1.1 has no X509_STORE_dup() and X509_LOOKUP_dup. + * Keep a list of hash directories so we can copy them over. */ + PyObject *hash_dirs; +} PySSLTrustStore; + +/* ************************************************************************ + * helpers and utils + */ +<<<<<<< HEAD +static BIO *_PySSL_filebio(_sslmodulestate *state, PyObject *path); +static BIO *_PySSL_bufferbio(_sslmodulestate *state, Py_buffer *b); +static PyObject *_PySSL_BytesFromBIO(_sslmodulestate *state, BIO *bio); +#if 0 +static PyObject *_PySSL_UnicodeFromBIO(_sslmodulestate *state, BIO *bio, const char *error); +#endif +||||||| parent of 27501a5ce86 (Work on trust store) +static BIO *_PySSL_filebio(PyObject *path); +static BIO *_PySSL_bufferbio(Py_buffer *b); +static PyObject *_PySSL_BytesFromBIO(BIO *bio); +#if 0 +static PyObject *_PySSL_UnicodeFromBIO(BIO *bio, const char *error); +#endif +======= +static BIO *_PySSL_filebio(PyObject *path); +static BIO *_PySSL_bufferbio(Py_buffer *b); +static PyObject *_PySSL_BytesFromBIO(BIO *bio); +static PyObject *_PySSL_UnicodeFromBIO(BIO *bio, const char *error); +>>>>>>> 27501a5ce86 (Work on trust store) + +/* ************************************************************************ + * password callback + */ + +typedef struct { + PyThreadState *thread_state; + PyObject *callable; + char *password; + int size; + int error; +} PySSLPasswordInfo; + +#define PYSSL_PWINFO_INIT(pw_info, password, err) \ + if ((password) && (password) != Py_None) { \ + if (PyCallable_Check(password)) { \ + (pw_info)->callable = (password); \ + } else if (!PySSL_pwinfo_set((pw_info), (password), \ + "password should be a string or callable")) { \ + return (err); \ + } \ + } + +#define PYSSL_PWINFO_ERROR(state, pw_info) \ + if ((pw_info)->error) { \ + /* the password callback has already set the error information */ \ + ERR_clear_error(); \ + } \ + else if (errno != 0) { \ + ERR_clear_error(); \ + PyErr_SetFromErrno(PyExc_OSError); \ + } \ + else { \ + _setSSLError(state, NULL, 0, __FILE__, __LINE__); \ + } + +static int +PySSL_pwinfo_set(PySSLPasswordInfo *pw_info, PyObject* password, + const char *bad_type_error); +static int +PySSL_password_cb(char *buf, int size, int rwflag, void *userdata); #endif /* Py_SSL_H */ diff --git a/Modules/_ssl/cert.c b/Modules/_ssl/cert.c new file mode 100644 index 00000000000000..e75badedb597d6 --- /dev/null +++ b/Modules/_ssl/cert.c @@ -0,0 +1,715 @@ +#include "Python.h" +#include "../_ssl.h" + +#include "openssl/err.h" +#include "openssl/bio.h" +#include "openssl/pem.h" +#include "openssl/x509.h" + +/*[clinic input] +module _ssl +class _ssl.Certificate "PySSLCertificate *" "PySSLCertificate_Type" +[clinic start generated code]*/ +/*[clinic end generated code: output=da39a3ee5e6b4b0d input=780fc647948cfffc]*/ + +#include "clinic/cert.c.h" + +static PyObject * +newCertificate(PyTypeObject *type, X509 *cert, int upref) +{ + PySSLCertificate *self; + + assert(type != NULL && type->tp_alloc != NULL); + assert(cert != NULL); + + self = (PySSLCertificate *) type->tp_alloc(type, 0); + if (self == NULL) { + return NULL; + } + if (upref == 1) { + X509_up_ref(cert); + } + self->cert = cert; + self->hash = -1; + + return (PyObject *) self; +} + +static PyObject * +_PySSL_CertificateFromX509(_sslmodulestate *state, X509 *cert, int upref) +{ + return newCertificate(state->PySSLCertificate_Type, cert, upref); +} + +static PyObject* +_PySSL_CertificateFromX509Stack(_sslmodulestate *state, STACK_OF(X509) *stack, int upref) +{ + int len, i; + PyObject *result = NULL; + + len = sk_X509_num(stack); + result = PyTuple_New(len); + if (result == NULL) { + return NULL; + } + for (i = 0; i < len; i++) { + X509 *cert = sk_X509_value(stack, i); + PyObject *ocert = _PySSL_CertificateFromX509(state, cert, upref); + if (ocert == NULL) { + Py_DECREF(result); + return NULL; + } + PyTuple_SET_ITEM(result, i, ocert); + } + return result; +} + +/* ************************************************************************ + * cert from file/buffer + */ + +static X509 * +read_cert_bio(BIO *bio, int format, PySSLPasswordInfo *pw_info) +{ + X509 *cert = NULL; + + switch(format) { + case PY_SSL_FILETYPE_PEM: + PySSL_BEGIN_ALLOW_THREADS_S(pw_info->thread_state); + cert = PEM_read_bio_X509(bio, NULL, PySSL_password_cb, pw_info); + PySSL_END_ALLOW_THREADS_S(pw_info->thread_state); + break; + case PY_SSL_FILETYPE_PEM_AUX: + PySSL_BEGIN_ALLOW_THREADS_S(pw_info->thread_state); + cert = PEM_read_bio_X509_AUX(bio, NULL, PySSL_password_cb, pw_info); + PySSL_END_ALLOW_THREADS_S(pw_info->thread_state); + break; + case PY_SSL_FILETYPE_ASN1: + PySSL_BEGIN_ALLOW_THREADS_S(pw_info->thread_state); + cert = d2i_X509_bio(bio, NULL); + PySSL_END_ALLOW_THREADS_S(pw_info->thread_state); + break; + default: + PyErr_SetString(PyExc_ValueError, "Unsupported format"); + return NULL; + } + + if (cert == NULL) { + PYSSL_PWINFO_ERROR(pw_info) + return NULL; + } + return cert; +} + +/*[clinic input] +@classmethod +_ssl.Certificate.from_file + path: object(converter="PyUnicode_FSConverter") + * + password: object = None + format: int(c_default="PY_SSL_FILETYPE_PEM") = FILETYPE_PEM + +[clinic start generated code]*/ + +static PyObject * +_ssl_Certificate_from_file_impl(PyTypeObject *type, PyObject *path, + PyObject *password, int format) +/*[clinic end generated code: output=9df64b603c7d0682 input=77eaf44d6c8fc08a]*/ +{ + X509 *cert = NULL; + BIO *bio; + PySSLPasswordInfo pw_info = { NULL, NULL, NULL, 0, 0 }; + + PYSSL_PWINFO_INIT(&pw_info, password, NULL) + + bio = _PySSL_filebio(path); + if (bio == NULL) { + return NULL; + } + cert = read_cert_bio(bio, format, &pw_info); + BIO_free(bio); + if (cert == NULL) { + return NULL; + } + return newCertificate(type, cert, 0); +} + +/*[clinic input] +@classmethod +_ssl.Certificate.from_buffer + buffer: Py_buffer + * + format: int(c_default="PY_SSL_FILETYPE_PEM") = FILETYPE_PEM +[clinic start generated code]*/ + +static PyObject * +_ssl_Certificate_from_buffer_impl(PyTypeObject *type, Py_buffer *buffer, + int format) +/*[clinic end generated code: output=1c4d18d114c22482 input=b8affe0f610e8311]*/ +{ + X509 *cert = NULL; + BIO *bio; + PySSLPasswordInfo pw_info = { NULL, NULL, NULL, 0, 0 }; + + PYSSL_PWINFO_INIT(&pw_info, NULL, NULL) + + bio = _PySSL_bufferbio(buffer); + if (bio == NULL) { + return NULL; + } + cert = read_cert_bio(bio, format, &pw_info); + BIO_free(bio); + if (cert == NULL) { + return NULL; + } + return newCertificate(type, cert, 0); +} + +/* ************************************************************************ + * cert chain from file/buffer + */ + +static PyObject * +read_certchain_bio(PyTypeObject *type, BIO *bio, PySSLPasswordInfo *pw_info) +{ + PyObject *lst, *certob; + X509 *cert; + int retcode; + int count = 0; + + lst = PyList_New(0); + if (lst == NULL) { + return NULL; + } + + while (1) { + PySSL_BEGIN_ALLOW_THREADS_S(pw_info->thread_state); + if (count == 0) + /* End-entity cert uses AUX */ + cert = PEM_read_bio_X509_AUX(bio, NULL, PySSL_password_cb, pw_info); + else + /* The rest of the chain doesn't use AUX */ + cert = PEM_read_bio_X509(bio, NULL, PySSL_password_cb, pw_info); + PySSL_END_ALLOW_THREADS_S(pw_info->thread_state); + + if (cert == NULL) { + if (count == 0) { + /* First cert must load */ + PYSSL_PWINFO_ERROR(pw_info) + goto error; + } + else { + /* Read cert 1..n until EOF is reached. */ + int err = ERR_peek_last_error(); + if (ERR_GET_LIB(err) == ERR_LIB_PEM + && ERR_GET_REASON(err) == PEM_R_NO_START_LINE) { + /* EOF, return result as tuple */ + PyObject *tup; + ERR_clear_error(); + tup = PyList_AsTuple(lst); + Py_DECREF(lst); + return tup; + } else { + PYSSL_PWINFO_ERROR(pw_info) + goto error; + } + } + } + certob = newCertificate(type, cert, 0); + if (certob == NULL) { + goto error; + } + retcode = PyList_Append(lst, certob); + Py_DECREF(certob); + if (retcode < 0) { + goto error; + } + count++; + } + Py_UNREACHABLE(); + error: + Py_DECREF(lst); + return NULL; +} + +/*[clinic input] +@classmethod +_ssl.Certificate.chain_from_file + path: object(converter="PyUnicode_FSConverter") + * + password: object = None +[clinic start generated code]*/ + +static PyObject * +_ssl_Certificate_chain_from_file_impl(PyTypeObject *type, PyObject *path, + PyObject *password) +/*[clinic end generated code: output=4883269e5ed24b7e input=8fe5316e08838397]*/ +{ + PyObject *chain; + BIO *bio; + PySSLPasswordInfo pw_info = { NULL, NULL, NULL, 0, 0 }; + + PYSSL_PWINFO_INIT(&pw_info, password, NULL) + + bio = _PySSL_filebio(path); + if (bio == NULL) { + return NULL; + } + chain = read_certchain_bio(type, bio, &pw_info); + BIO_free(bio); + return chain; +} + +/*[clinic input] +@classmethod +_ssl.Certificate.chain_from_buffer + buffer: Py_buffer +[clinic start generated code]*/ + +static PyObject * +_ssl_Certificate_chain_from_buffer_impl(PyTypeObject *type, + Py_buffer *buffer) +/*[clinic end generated code: output=ce4f74c07fec525e input=db88ea3ca79f7415]*/ +{ + PyObject *chain; + BIO *bio; + PySSLPasswordInfo pw_info = { NULL, NULL, NULL, 0, 0 }; + + PYSSL_PWINFO_INIT(&pw_info, NULL, NULL) + + bio = _PySSL_bufferbio(buffer); + if (bio == NULL) { + return NULL; + } + chain = read_certchain_bio(type, bio, &pw_info); + BIO_free(bio); + return chain; +} + +/* ************************************************************************ + * cert bundle from file/buffer, returns List[Certificate] + */ + +static PyObject * +read_certbundle_bio(PyTypeObject *type, BIO *bio, int format, PySSLPasswordInfo *pw_info) +{ + PyObject *lst, *certob; + X509 *cert; + int retcode; + int count = 0; + + if (format != PY_SSL_FILETYPE_PEM_AUX && format != PY_SSL_FILETYPE_PEM) { + PyErr_SetString(PyExc_ValueError, "Unsupported format"); + return NULL; + } + + lst = PyList_New(0); + if (lst == NULL) { + return NULL; + } + + while (1) { + PySSL_BEGIN_ALLOW_THREADS_S(pw_info->thread_state); + if (format == PY_SSL_FILETYPE_PEM) + cert = PEM_read_bio_X509_AUX(bio, NULL, PySSL_password_cb, pw_info); + else + cert = PEM_read_bio_X509(bio, NULL, PySSL_password_cb, pw_info); + PySSL_END_ALLOW_THREADS_S(pw_info->thread_state); + + if (cert == NULL) { + int err = ERR_peek_last_error(); + if (ERR_GET_LIB(err) == ERR_LIB_PEM + && ERR_GET_REASON(err) == PEM_R_NO_START_LINE) { + /* EOF, return result as list */ + ERR_clear_error(); + return lst; + } else { + PYSSL_PWINFO_ERROR(pw_info) + goto error; + } + } + certob = newCertificate(type, cert, 0); + if (certob == NULL) { + goto error; + } + retcode = PyList_Append(lst, certob); + Py_DECREF(certob); + if (retcode < 0) { + goto error; + } + count++; + } + Py_UNREACHABLE(); + error: + Py_DECREF(lst); + return NULL; +} + +/*[clinic input] +@classmethod +_ssl.Certificate.bundle_from_file + path: object(converter="PyUnicode_FSConverter") + * + format: int(c_default="PY_SSL_FILETYPE_PEM") = FILETYPE_PEM +[clinic start generated code]*/ + +static PyObject * +_ssl_Certificate_bundle_from_file_impl(PyTypeObject *type, PyObject *path, + int format) +/*[clinic end generated code: output=a44c0f0fd831487b input=f29ad47b773d8167]*/ +{ + PyObject *certs; + BIO *bio; + PySSLPasswordInfo pw_info = { NULL, NULL, NULL, 0, 0 }; + + PYSSL_PWINFO_INIT(&pw_info, NULL, NULL) + + bio = _PySSL_filebio(path); + if (bio == NULL) { + return NULL; + } + certs = read_certbundle_bio(type, bio, format, &pw_info); + BIO_free(bio); + return certs; +} + +/*[clinic input] +@classmethod +_ssl.Certificate.bundle_from_buffer + buffer: Py_buffer + * + format: int(c_default="PY_SSL_FILETYPE_PEM") = FILETYPE_PEM +[clinic start generated code]*/ + +static PyObject * +_ssl_Certificate_bundle_from_buffer_impl(PyTypeObject *type, + Py_buffer *buffer, int format) +/*[clinic end generated code: output=75930fc21796664c input=a273edd7a4b6d130]*/ +{ + PyObject *certs; + BIO *bio; + PySSLPasswordInfo pw_info = { NULL, NULL, NULL, 0, 0 }; + + PYSSL_PWINFO_INIT(&pw_info, NULL, NULL) + + bio = _PySSL_bufferbio(buffer); + if (bio == NULL) { + return NULL; + } + certs = read_certbundle_bio(type, bio, format, &pw_info); + BIO_free(bio); + return certs; +} + +/* ************************************************************************ + * misc methods + */ + +/*[clinic input] +_ssl.Certificate.check_hostname + hostname: str(encoding='idna', accept={bytes, bytearray, str}) + * + flags: unsigned_int(bitwise=True) = 0 + +[clinic start generated code]*/ + +static PyObject * +_ssl_Certificate_check_hostname_impl(PySSLCertificate *self, char *hostname, + unsigned int flags) +/*[clinic end generated code: output=ba5a8a01cb6ef23c input=68769dc729509c7b]*/ +{ + int retcode; + char *peername = NULL; + PyObject *result; + + retcode = X509_check_host(self->cert, hostname, strlen(hostname), flags, &peername); + if (retcode != 1) { + if (peername != NULL) { + OPENSSL_free(peername); + } + Py_RETURN_FALSE; + } + assert(peername); + result = PyUnicode_FromString(peername); + OPENSSL_free(peername); + return result; +} + +/*[clinic input] +_ssl.Certificate.check_ipaddress + address: str + * + flags: unsigned_int(bitwise=True) = 0 + +[clinic start generated code]*/ + +static PyObject * +_ssl_Certificate_check_ipaddress_impl(PySSLCertificate *self, + const char *address, + unsigned int flags) +/*[clinic end generated code: output=c85f838ca66238dd input=98eb596458ecbc68]*/ +{ + int retcode; + retcode = X509_check_ip_asc(self->cert, address, flags); + if (retcode != 1) { + Py_RETURN_FALSE; + } + Py_RETURN_TRUE; +} + +/*[clinic input] +_ssl.Certificate.dumps + format: int(c_default="PY_SSL_FILETYPE_PEM") = FILETYPE_PEM + +[clinic start generated code]*/ + +static PyObject * +_ssl_Certificate_dumps_impl(PySSLCertificate *self, int format) +/*[clinic end generated code: output=5403de70ad7cfcfe input=39ea9c60220b1999]*/ +{ + BIO *bio; + int retcode; + PyObject *result; + + bio = BIO_new(BIO_s_mem()); + if (bio == NULL) { + PyErr_SetString(PySSLErrorObject, + "failed to allocate BIO"); + return NULL; + } + /* release GIL? */ + switch(format) { + case PY_SSL_FILETYPE_PEM: + retcode = PEM_write_bio_X509(bio, self->cert); + break; + case PY_SSL_FILETYPE_PEM_AUX: + retcode = PEM_write_bio_X509_AUX(bio, self->cert); + break; + case PY_SSL_FILETYPE_ASN1: + retcode = i2d_X509_bio(bio, self->cert); + break; + default: + PyErr_SetString(PyExc_ValueError, "Unsupported format"); + BIO_free(bio); + return NULL; + } + if (retcode != 1) { + BIO_free(bio); + _setSSLError(NULL, 0, __FILE__, __LINE__); + return NULL; + } + result = _PySSL_BytesFromBIO(bio); + BIO_free(bio); + return result; +} + + +/*[clinic input] +_ssl.Certificate.get_info + +[clinic start generated code]*/ + +static PyObject * +_ssl_Certificate_get_info_impl(PySSLCertificate *self) +/*[clinic end generated code: output=0f0deaac54f4408b input=ba2c1694b39d0778]*/ +{ + return _decode_certificate(self->cert); +} + +static PyObject* +_x509name_print(X509_NAME *name, int indent, unsigned long flags) +{ + PyObject *res; + BIO *biobuf; + + biobuf = BIO_new(BIO_s_mem()); + if (biobuf == NULL) { + PyErr_SetString(PyExc_MemoryError, "failed to allocate BIO"); + return NULL; + } + + if (X509_NAME_print_ex(biobuf, name, indent, flags) <= 0) { + _setSSLError(NULL, 0, __FILE__, __LINE__); + BIO_free(biobuf); + return NULL; + } + res = _PySSL_UnicodeFromBIO(biobuf, "strict"); + BIO_free(biobuf); + return res; +} + +static PyObject* +_x509name_convert(X509_NAME *name, PyObject *oflags) +{ + if (oflags == Py_None) { + return _create_tuple_for_X509_NAME(name); + } + unsigned long flags = PyLong_AsUnsignedLongMask(oflags); + if (flags == (unsigned long)-1 && PyErr_Occurred()) { + return NULL; + } + return _x509name_print(name, 0, flags); +} + +/*[clinic input] +_ssl.Certificate.get_issuer + * + flags: object = None + +[clinic start generated code]*/ + +static PyObject * +_ssl_Certificate_get_issuer_impl(PySSLCertificate *self, PyObject *flags) +/*[clinic end generated code: output=2cf445ff11efbdbb input=f804b370c2de93ca]*/ +{ + return _x509name_convert(X509_get_issuer_name(self->cert), flags); +} + +/*[clinic input] +_ssl.Certificate.get_subject + * + flags: object = None + +[clinic start generated code]*/ + +static PyObject * +_ssl_Certificate_get_subject_impl(PySSLCertificate *self, PyObject *flags) +/*[clinic end generated code: output=7c6a7829974c90e8 input=8d7072cd3e57e3a8]*/ +{ + return _x509name_convert(X509_get_subject_name(self->cert), flags); +} + +/*[clinic input] +_ssl.Certificate.get_spki + +[clinic start generated code]*/ + +static PyObject * +_ssl_Certificate_get_spki_impl(PySSLCertificate *self) +/*[clinic end generated code: output=93ca3624873a9c0d input=342685f656a52052]*/ +{ + unsigned char *buf; + int len; + PyObject *spki; + X509_PUBKEY *pkey; + + /* TODO: add error checks */ + pkey = X509_get_X509_PUBKEY(self->cert); + assert(pkey != NULL); + len = i2d_X509_PUBKEY(pkey, NULL); + + spki = PyBytes_FromStringAndSize(NULL, len); + if (spki == NULL) + return NULL; + + buf = (unsigned char *)PyBytes_AS_STRING(spki); + i2d_X509_PUBKEY(pkey, &buf); + return spki; +} + +/* ************************************************************************ + * PySSLCertificate_Type + */ + +static PyObject * +certificate_repr(PySSLCertificate *self) +{ + PyObject *osubject, *result; + + /* subject string is ASCII encoded, UTF-8 chars are quoted */ + osubject = _x509name_print( + X509_get_subject_name(self->cert), 0, XN_FLAG_RFC2253); + if (osubject == NULL) + return NULL; + result = PyUnicode_FromFormat( + "<%s '%U'>", + Py_TYPE(self)->tp_name, osubject + ); + Py_DECREF(osubject); + return result; +} + +static Py_hash_t +certificate_hash(PySSLCertificate *self) +{ + if (self->hash == (Py_hash_t)-1) { + unsigned long hash; + hash = X509_subject_name_hash(self->cert); + if ((Py_hash_t)hash == (Py_hash_t)-1) { + self->hash = -2; + } else { + self->hash = (Py_hash_t)hash; + } + } + return self->hash; +} + +static PyObject * +certificate_richcompare(PySSLCertificate *self, PyObject *other, int op) +{ + int cmp; + + if (Py_TYPE(other) != PySSLCertificate_Type) { + Py_RETURN_NOTIMPLEMENTED; + } + /* only support == and != */ + if ((op != Py_EQ) && (op != Py_NE)) { + Py_RETURN_NOTIMPLEMENTED; + } + cmp = X509_cmp(self->cert, ((PySSLCertificate*)other)->cert); + if (((op == Py_EQ) && (cmp == 0)) || ((op == Py_NE) && (cmp != 0))) { + Py_RETURN_TRUE; + } else { + Py_RETURN_FALSE; + } +} + +static void +certificate_dealloc(PySSLCertificate *self) +{ + PyTypeObject *tp = Py_TYPE(self); + X509_free(self->cert); + Py_TYPE(self)->tp_free(self); + Py_DECREF(tp); +} + +static PyGetSetDef certificate_getsetlist[] = { + {NULL} /* sentinel */ +}; + +static PyMethodDef certificate_methods[] = { + /* constructors */ + _SSL_CERTIFICATE_FROM_FILE_METHODDEF + _SSL_CERTIFICATE_FROM_BUFFER_METHODDEF + _SSL_CERTIFICATE_CHAIN_FROM_FILE_METHODDEF + _SSL_CERTIFICATE_CHAIN_FROM_BUFFER_METHODDEF + _SSL_CERTIFICATE_BUNDLE_FROM_FILE_METHODDEF + _SSL_CERTIFICATE_BUNDLE_FROM_BUFFER_METHODDEF + /* methods */ + _SSL_CERTIFICATE_CHECK_HOSTNAME_METHODDEF + _SSL_CERTIFICATE_CHECK_IPADDRESS_METHODDEF + _SSL_CERTIFICATE_DUMPS_METHODDEF + _SSL_CERTIFICATE_GET_INFO_METHODDEF + _SSL_CERTIFICATE_GET_ISSUER_METHODDEF + _SSL_CERTIFICATE_GET_SUBJECT_METHODDEF + _SSL_CERTIFICATE_GET_SPKI_METHODDEF + {NULL, NULL} +}; + +static PyType_Slot PySSLCertificate_slots[] = { + {Py_tp_dealloc, certificate_dealloc}, + {Py_tp_repr, certificate_repr}, + {Py_tp_hash, certificate_hash}, + {Py_tp_richcompare, certificate_richcompare}, + {Py_tp_methods, certificate_methods}, + {Py_tp_getset, certificate_getsetlist}, + {0, 0}, +}; + +static PyType_Spec PySSLCertificate_spec = { + "_ssl.Certificate", + sizeof(PySSLCertificate), + 0, + Py_TPFLAGS_DEFAULT, + PySSLCertificate_slots, +}; diff --git a/Modules/_ssl/clinic/cert.c.h b/Modules/_ssl/clinic/cert.c.h new file mode 100644 index 00000000000000..a8efd801d484de --- /dev/null +++ b/Modules/_ssl/clinic/cert.c.h @@ -0,0 +1,525 @@ +/*[clinic input] +preserve +[clinic start generated code]*/ + +PyDoc_STRVAR(_ssl_Certificate_from_file__doc__, +"from_file($type, /, path, *, password=None, format=FILETYPE_PEM)\n" +"--\n" +"\n"); + +#define _SSL_CERTIFICATE_FROM_FILE_METHODDEF \ + {"from_file", (PyCFunction)(void(*)(void))_ssl_Certificate_from_file, METH_FASTCALL|METH_KEYWORDS|METH_CLASS, _ssl_Certificate_from_file__doc__}, + +static PyObject * +_ssl_Certificate_from_file_impl(PyTypeObject *type, PyObject *path, + PyObject *password, int format); + +static PyObject * +_ssl_Certificate_from_file(PyTypeObject *type, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) +{ + PyObject *return_value = NULL; + static const char * const _keywords[] = {"path", "password", "format", NULL}; + static _PyArg_Parser _parser = {NULL, _keywords, "from_file", 0}; + PyObject *argsbuf[3]; + Py_ssize_t noptargs = nargs + (kwnames ? PyTuple_GET_SIZE(kwnames) : 0) - 1; + PyObject *path; + PyObject *password = Py_None; + int format = PY_SSL_FILETYPE_PEM; + + args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser, 1, 1, 0, argsbuf); + if (!args) { + goto exit; + } + if (!PyUnicode_FSConverter(args[0], &path)) { + goto exit; + } + if (!noptargs) { + goto skip_optional_kwonly; + } + if (args[1]) { + password = args[1]; + if (!--noptargs) { + goto skip_optional_kwonly; + } + } + format = _PyLong_AsInt(args[2]); + if (format == -1 && PyErr_Occurred()) { + goto exit; + } +skip_optional_kwonly: + return_value = _ssl_Certificate_from_file_impl(type, path, password, format); + +exit: + return return_value; +} + +PyDoc_STRVAR(_ssl_Certificate_from_buffer__doc__, +"from_buffer($type, /, buffer, *, format=FILETYPE_PEM)\n" +"--\n" +"\n"); + +#define _SSL_CERTIFICATE_FROM_BUFFER_METHODDEF \ + {"from_buffer", (PyCFunction)(void(*)(void))_ssl_Certificate_from_buffer, METH_FASTCALL|METH_KEYWORDS|METH_CLASS, _ssl_Certificate_from_buffer__doc__}, + +static PyObject * +_ssl_Certificate_from_buffer_impl(PyTypeObject *type, Py_buffer *buffer, + int format); + +static PyObject * +_ssl_Certificate_from_buffer(PyTypeObject *type, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) +{ + PyObject *return_value = NULL; + static const char * const _keywords[] = {"buffer", "format", NULL}; + static _PyArg_Parser _parser = {NULL, _keywords, "from_buffer", 0}; + PyObject *argsbuf[2]; + Py_ssize_t noptargs = nargs + (kwnames ? PyTuple_GET_SIZE(kwnames) : 0) - 1; + Py_buffer buffer = {NULL, NULL}; + int format = PY_SSL_FILETYPE_PEM; + + args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser, 1, 1, 0, argsbuf); + if (!args) { + goto exit; + } + if (PyObject_GetBuffer(args[0], &buffer, PyBUF_SIMPLE) != 0) { + goto exit; + } + if (!PyBuffer_IsContiguous(&buffer, 'C')) { + _PyArg_BadArgument("from_buffer", "argument 'buffer'", "contiguous buffer", args[0]); + goto exit; + } + if (!noptargs) { + goto skip_optional_kwonly; + } + format = _PyLong_AsInt(args[1]); + if (format == -1 && PyErr_Occurred()) { + goto exit; + } +skip_optional_kwonly: + return_value = _ssl_Certificate_from_buffer_impl(type, &buffer, format); + +exit: + /* Cleanup for buffer */ + if (buffer.obj) { + PyBuffer_Release(&buffer); + } + + return return_value; +} + +PyDoc_STRVAR(_ssl_Certificate_chain_from_file__doc__, +"chain_from_file($type, /, path, *, password=None)\n" +"--\n" +"\n"); + +#define _SSL_CERTIFICATE_CHAIN_FROM_FILE_METHODDEF \ + {"chain_from_file", (PyCFunction)(void(*)(void))_ssl_Certificate_chain_from_file, METH_FASTCALL|METH_KEYWORDS|METH_CLASS, _ssl_Certificate_chain_from_file__doc__}, + +static PyObject * +_ssl_Certificate_chain_from_file_impl(PyTypeObject *type, PyObject *path, + PyObject *password); + +static PyObject * +_ssl_Certificate_chain_from_file(PyTypeObject *type, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) +{ + PyObject *return_value = NULL; + static const char * const _keywords[] = {"path", "password", NULL}; + static _PyArg_Parser _parser = {NULL, _keywords, "chain_from_file", 0}; + PyObject *argsbuf[2]; + Py_ssize_t noptargs = nargs + (kwnames ? PyTuple_GET_SIZE(kwnames) : 0) - 1; + PyObject *path; + PyObject *password = Py_None; + + args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser, 1, 1, 0, argsbuf); + if (!args) { + goto exit; + } + if (!PyUnicode_FSConverter(args[0], &path)) { + goto exit; + } + if (!noptargs) { + goto skip_optional_kwonly; + } + password = args[1]; +skip_optional_kwonly: + return_value = _ssl_Certificate_chain_from_file_impl(type, path, password); + +exit: + return return_value; +} + +PyDoc_STRVAR(_ssl_Certificate_chain_from_buffer__doc__, +"chain_from_buffer($type, /, buffer)\n" +"--\n" +"\n"); + +#define _SSL_CERTIFICATE_CHAIN_FROM_BUFFER_METHODDEF \ + {"chain_from_buffer", (PyCFunction)(void(*)(void))_ssl_Certificate_chain_from_buffer, METH_FASTCALL|METH_KEYWORDS|METH_CLASS, _ssl_Certificate_chain_from_buffer__doc__}, + +static PyObject * +_ssl_Certificate_chain_from_buffer_impl(PyTypeObject *type, + Py_buffer *buffer); + +static PyObject * +_ssl_Certificate_chain_from_buffer(PyTypeObject *type, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) +{ + PyObject *return_value = NULL; + static const char * const _keywords[] = {"buffer", NULL}; + static _PyArg_Parser _parser = {NULL, _keywords, "chain_from_buffer", 0}; + PyObject *argsbuf[1]; + Py_buffer buffer = {NULL, NULL}; + + args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser, 1, 1, 0, argsbuf); + if (!args) { + goto exit; + } + if (PyObject_GetBuffer(args[0], &buffer, PyBUF_SIMPLE) != 0) { + goto exit; + } + if (!PyBuffer_IsContiguous(&buffer, 'C')) { + _PyArg_BadArgument("chain_from_buffer", "argument 'buffer'", "contiguous buffer", args[0]); + goto exit; + } + return_value = _ssl_Certificate_chain_from_buffer_impl(type, &buffer); + +exit: + /* Cleanup for buffer */ + if (buffer.obj) { + PyBuffer_Release(&buffer); + } + + return return_value; +} + +PyDoc_STRVAR(_ssl_Certificate_bundle_from_file__doc__, +"bundle_from_file($type, /, path, *, format=FILETYPE_PEM)\n" +"--\n" +"\n"); + +#define _SSL_CERTIFICATE_BUNDLE_FROM_FILE_METHODDEF \ + {"bundle_from_file", (PyCFunction)(void(*)(void))_ssl_Certificate_bundle_from_file, METH_FASTCALL|METH_KEYWORDS|METH_CLASS, _ssl_Certificate_bundle_from_file__doc__}, + +static PyObject * +_ssl_Certificate_bundle_from_file_impl(PyTypeObject *type, PyObject *path, + int format); + +static PyObject * +_ssl_Certificate_bundle_from_file(PyTypeObject *type, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) +{ + PyObject *return_value = NULL; + static const char * const _keywords[] = {"path", "format", NULL}; + static _PyArg_Parser _parser = {NULL, _keywords, "bundle_from_file", 0}; + PyObject *argsbuf[2]; + Py_ssize_t noptargs = nargs + (kwnames ? PyTuple_GET_SIZE(kwnames) : 0) - 1; + PyObject *path; + int format = PY_SSL_FILETYPE_PEM; + + args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser, 1, 1, 0, argsbuf); + if (!args) { + goto exit; + } + if (!PyUnicode_FSConverter(args[0], &path)) { + goto exit; + } + if (!noptargs) { + goto skip_optional_kwonly; + } + format = _PyLong_AsInt(args[1]); + if (format == -1 && PyErr_Occurred()) { + goto exit; + } +skip_optional_kwonly: + return_value = _ssl_Certificate_bundle_from_file_impl(type, path, format); + +exit: + return return_value; +} + +PyDoc_STRVAR(_ssl_Certificate_bundle_from_buffer__doc__, +"bundle_from_buffer($type, /, buffer, *, format=FILETYPE_PEM)\n" +"--\n" +"\n"); + +#define _SSL_CERTIFICATE_BUNDLE_FROM_BUFFER_METHODDEF \ + {"bundle_from_buffer", (PyCFunction)(void(*)(void))_ssl_Certificate_bundle_from_buffer, METH_FASTCALL|METH_KEYWORDS|METH_CLASS, _ssl_Certificate_bundle_from_buffer__doc__}, + +static PyObject * +_ssl_Certificate_bundle_from_buffer_impl(PyTypeObject *type, + Py_buffer *buffer, int format); + +static PyObject * +_ssl_Certificate_bundle_from_buffer(PyTypeObject *type, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) +{ + PyObject *return_value = NULL; + static const char * const _keywords[] = {"buffer", "format", NULL}; + static _PyArg_Parser _parser = {NULL, _keywords, "bundle_from_buffer", 0}; + PyObject *argsbuf[2]; + Py_ssize_t noptargs = nargs + (kwnames ? PyTuple_GET_SIZE(kwnames) : 0) - 1; + Py_buffer buffer = {NULL, NULL}; + int format = PY_SSL_FILETYPE_PEM; + + args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser, 1, 1, 0, argsbuf); + if (!args) { + goto exit; + } + if (PyObject_GetBuffer(args[0], &buffer, PyBUF_SIMPLE) != 0) { + goto exit; + } + if (!PyBuffer_IsContiguous(&buffer, 'C')) { + _PyArg_BadArgument("bundle_from_buffer", "argument 'buffer'", "contiguous buffer", args[0]); + goto exit; + } + if (!noptargs) { + goto skip_optional_kwonly; + } + format = _PyLong_AsInt(args[1]); + if (format == -1 && PyErr_Occurred()) { + goto exit; + } +skip_optional_kwonly: + return_value = _ssl_Certificate_bundle_from_buffer_impl(type, &buffer, format); + +exit: + /* Cleanup for buffer */ + if (buffer.obj) { + PyBuffer_Release(&buffer); + } + + return return_value; +} + +PyDoc_STRVAR(_ssl_Certificate_check_hostname__doc__, +"check_hostname($self, /, hostname, *, flags=0)\n" +"--\n" +"\n"); + +#define _SSL_CERTIFICATE_CHECK_HOSTNAME_METHODDEF \ + {"check_hostname", (PyCFunction)(void(*)(void))_ssl_Certificate_check_hostname, METH_FASTCALL|METH_KEYWORDS, _ssl_Certificate_check_hostname__doc__}, + +static PyObject * +_ssl_Certificate_check_hostname_impl(PySSLCertificate *self, char *hostname, + unsigned int flags); + +static PyObject * +_ssl_Certificate_check_hostname(PySSLCertificate *self, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) +{ + PyObject *return_value = NULL; + static const char * const _keywords[] = {"hostname", "flags", NULL}; + static _PyArg_Parser _parser = {"et|$I:check_hostname", _keywords, 0}; + char *hostname = NULL; + unsigned int flags = 0; + + if (!_PyArg_ParseStackAndKeywords(args, nargs, kwnames, &_parser, + "idna", &hostname, &flags)) { + goto exit; + } + return_value = _ssl_Certificate_check_hostname_impl(self, hostname, flags); + +exit: + /* Cleanup for hostname */ + if (hostname) { + PyMem_FREE(hostname); + } + + return return_value; +} + +PyDoc_STRVAR(_ssl_Certificate_check_ipaddress__doc__, +"check_ipaddress($self, /, address, *, flags=0)\n" +"--\n" +"\n"); + +#define _SSL_CERTIFICATE_CHECK_IPADDRESS_METHODDEF \ + {"check_ipaddress", (PyCFunction)(void(*)(void))_ssl_Certificate_check_ipaddress, METH_FASTCALL|METH_KEYWORDS, _ssl_Certificate_check_ipaddress__doc__}, + +static PyObject * +_ssl_Certificate_check_ipaddress_impl(PySSLCertificate *self, + const char *address, + unsigned int flags); + +static PyObject * +_ssl_Certificate_check_ipaddress(PySSLCertificate *self, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) +{ + PyObject *return_value = NULL; + static const char * const _keywords[] = {"address", "flags", NULL}; + static _PyArg_Parser _parser = {NULL, _keywords, "check_ipaddress", 0}; + PyObject *argsbuf[2]; + Py_ssize_t noptargs = nargs + (kwnames ? PyTuple_GET_SIZE(kwnames) : 0) - 1; + const char *address; + unsigned int flags = 0; + + args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser, 1, 1, 0, argsbuf); + if (!args) { + goto exit; + } + if (!PyUnicode_Check(args[0])) { + _PyArg_BadArgument("check_ipaddress", "argument 'address'", "str", args[0]); + goto exit; + } + Py_ssize_t address_length; + address = PyUnicode_AsUTF8AndSize(args[0], &address_length); + if (address == NULL) { + goto exit; + } + if (strlen(address) != (size_t)address_length) { + PyErr_SetString(PyExc_ValueError, "embedded null character"); + goto exit; + } + if (!noptargs) { + goto skip_optional_kwonly; + } + flags = (unsigned int)PyLong_AsUnsignedLongMask(args[1]); + if (flags == (unsigned int)-1 && PyErr_Occurred()) { + goto exit; + } +skip_optional_kwonly: + return_value = _ssl_Certificate_check_ipaddress_impl(self, address, flags); + +exit: + return return_value; +} + +PyDoc_STRVAR(_ssl_Certificate_dumps__doc__, +"dumps($self, /, format=FILETYPE_PEM)\n" +"--\n" +"\n"); + +#define _SSL_CERTIFICATE_DUMPS_METHODDEF \ + {"dumps", (PyCFunction)(void(*)(void))_ssl_Certificate_dumps, METH_FASTCALL|METH_KEYWORDS, _ssl_Certificate_dumps__doc__}, + +static PyObject * +_ssl_Certificate_dumps_impl(PySSLCertificate *self, int format); + +static PyObject * +_ssl_Certificate_dumps(PySSLCertificate *self, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) +{ + PyObject *return_value = NULL; + static const char * const _keywords[] = {"format", NULL}; + static _PyArg_Parser _parser = {NULL, _keywords, "dumps", 0}; + PyObject *argsbuf[1]; + Py_ssize_t noptargs = nargs + (kwnames ? PyTuple_GET_SIZE(kwnames) : 0) - 0; + int format = PY_SSL_FILETYPE_PEM; + + args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser, 0, 1, 0, argsbuf); + if (!args) { + goto exit; + } + if (!noptargs) { + goto skip_optional_pos; + } + format = _PyLong_AsInt(args[0]); + if (format == -1 && PyErr_Occurred()) { + goto exit; + } +skip_optional_pos: + return_value = _ssl_Certificate_dumps_impl(self, format); + +exit: + return return_value; +} + +PyDoc_STRVAR(_ssl_Certificate_get_info__doc__, +"get_info($self, /)\n" +"--\n" +"\n"); + +#define _SSL_CERTIFICATE_GET_INFO_METHODDEF \ + {"get_info", (PyCFunction)_ssl_Certificate_get_info, METH_NOARGS, _ssl_Certificate_get_info__doc__}, + +static PyObject * +_ssl_Certificate_get_info_impl(PySSLCertificate *self); + +static PyObject * +_ssl_Certificate_get_info(PySSLCertificate *self, PyObject *Py_UNUSED(ignored)) +{ + return _ssl_Certificate_get_info_impl(self); +} + +PyDoc_STRVAR(_ssl_Certificate_get_issuer__doc__, +"get_issuer($self, /, *, flags=None)\n" +"--\n" +"\n"); + +#define _SSL_CERTIFICATE_GET_ISSUER_METHODDEF \ + {"get_issuer", (PyCFunction)(void(*)(void))_ssl_Certificate_get_issuer, METH_FASTCALL|METH_KEYWORDS, _ssl_Certificate_get_issuer__doc__}, + +static PyObject * +_ssl_Certificate_get_issuer_impl(PySSLCertificate *self, PyObject *flags); + +static PyObject * +_ssl_Certificate_get_issuer(PySSLCertificate *self, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) +{ + PyObject *return_value = NULL; + static const char * const _keywords[] = {"flags", NULL}; + static _PyArg_Parser _parser = {NULL, _keywords, "get_issuer", 0}; + PyObject *argsbuf[1]; + Py_ssize_t noptargs = nargs + (kwnames ? PyTuple_GET_SIZE(kwnames) : 0) - 0; + PyObject *flags = Py_None; + + args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser, 0, 0, 0, argsbuf); + if (!args) { + goto exit; + } + if (!noptargs) { + goto skip_optional_kwonly; + } + flags = args[0]; +skip_optional_kwonly: + return_value = _ssl_Certificate_get_issuer_impl(self, flags); + +exit: + return return_value; +} + +PyDoc_STRVAR(_ssl_Certificate_get_subject__doc__, +"get_subject($self, /, *, flags=None)\n" +"--\n" +"\n"); + +#define _SSL_CERTIFICATE_GET_SUBJECT_METHODDEF \ + {"get_subject", (PyCFunction)(void(*)(void))_ssl_Certificate_get_subject, METH_FASTCALL|METH_KEYWORDS, _ssl_Certificate_get_subject__doc__}, + +static PyObject * +_ssl_Certificate_get_subject_impl(PySSLCertificate *self, PyObject *flags); + +static PyObject * +_ssl_Certificate_get_subject(PySSLCertificate *self, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) +{ + PyObject *return_value = NULL; + static const char * const _keywords[] = {"flags", NULL}; + static _PyArg_Parser _parser = {NULL, _keywords, "get_subject", 0}; + PyObject *argsbuf[1]; + Py_ssize_t noptargs = nargs + (kwnames ? PyTuple_GET_SIZE(kwnames) : 0) - 0; + PyObject *flags = Py_None; + + args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser, 0, 0, 0, argsbuf); + if (!args) { + goto exit; + } + if (!noptargs) { + goto skip_optional_kwonly; + } + flags = args[0]; +skip_optional_kwonly: + return_value = _ssl_Certificate_get_subject_impl(self, flags); + +exit: + return return_value; +} + +PyDoc_STRVAR(_ssl_Certificate_get_spki__doc__, +"get_spki($self, /)\n" +"--\n" +"\n"); + +#define _SSL_CERTIFICATE_GET_SPKI_METHODDEF \ + {"get_spki", (PyCFunction)_ssl_Certificate_get_spki, METH_NOARGS, _ssl_Certificate_get_spki__doc__}, + +static PyObject * +_ssl_Certificate_get_spki_impl(PySSLCertificate *self); + +static PyObject * +_ssl_Certificate_get_spki(PySSLCertificate *self, PyObject *Py_UNUSED(ignored)) +{ + return _ssl_Certificate_get_spki_impl(self); +} +/*[clinic end generated code: output=54803b371d3e7889 input=a9049054013a1b77]*/ diff --git a/Modules/_ssl/clinic/pkey.c.h b/Modules/_ssl/clinic/pkey.c.h new file mode 100644 index 00000000000000..a6d0cc760364f7 --- /dev/null +++ b/Modules/_ssl/clinic/pkey.c.h @@ -0,0 +1,115 @@ +/*[clinic input] +preserve +[clinic start generated code]*/ + +PyDoc_STRVAR(_ssl_PrivateKey_from_file__doc__, +"from_file($type, /, path, *, password=None, format=FILETYPE_PEM)\n" +"--\n" +"\n"); + +#define _SSL_PRIVATEKEY_FROM_FILE_METHODDEF \ + {"from_file", (PyCFunction)(void(*)(void))_ssl_PrivateKey_from_file, METH_FASTCALL|METH_KEYWORDS|METH_CLASS, _ssl_PrivateKey_from_file__doc__}, + +static PyObject * +_ssl_PrivateKey_from_file_impl(PyTypeObject *type, PyObject *path, + PyObject *password, int format); + +static PyObject * +_ssl_PrivateKey_from_file(PyTypeObject *type, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) +{ + PyObject *return_value = NULL; + static const char * const _keywords[] = {"path", "password", "format", NULL}; + static _PyArg_Parser _parser = {NULL, _keywords, "from_file", 0}; + PyObject *argsbuf[3]; + Py_ssize_t noptargs = nargs + (kwnames ? PyTuple_GET_SIZE(kwnames) : 0) - 1; + PyObject *path; + PyObject *password = Py_None; + int format = PY_SSL_FILETYPE_PEM; + + args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser, 1, 1, 0, argsbuf); + if (!args) { + goto exit; + } + if (!PyUnicode_FSConverter(args[0], &path)) { + goto exit; + } + if (!noptargs) { + goto skip_optional_kwonly; + } + if (args[1]) { + password = args[1]; + if (!--noptargs) { + goto skip_optional_kwonly; + } + } + format = _PyLong_AsInt(args[2]); + if (format == -1 && PyErr_Occurred()) { + goto exit; + } +skip_optional_kwonly: + return_value = _ssl_PrivateKey_from_file_impl(type, path, password, format); + +exit: + return return_value; +} + +PyDoc_STRVAR(_ssl_PrivateKey_from_buffer__doc__, +"from_buffer($type, /, buffer, *, password=None, format=FILETYPE_PEM)\n" +"--\n" +"\n"); + +#define _SSL_PRIVATEKEY_FROM_BUFFER_METHODDEF \ + {"from_buffer", (PyCFunction)(void(*)(void))_ssl_PrivateKey_from_buffer, METH_FASTCALL|METH_KEYWORDS|METH_CLASS, _ssl_PrivateKey_from_buffer__doc__}, + +static PyObject * +_ssl_PrivateKey_from_buffer_impl(PyTypeObject *type, Py_buffer *buffer, + PyObject *password, int format); + +static PyObject * +_ssl_PrivateKey_from_buffer(PyTypeObject *type, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) +{ + PyObject *return_value = NULL; + static const char * const _keywords[] = {"buffer", "password", "format", NULL}; + static _PyArg_Parser _parser = {NULL, _keywords, "from_buffer", 0}; + PyObject *argsbuf[3]; + Py_ssize_t noptargs = nargs + (kwnames ? PyTuple_GET_SIZE(kwnames) : 0) - 1; + Py_buffer buffer = {NULL, NULL}; + PyObject *password = Py_None; + int format = PY_SSL_FILETYPE_PEM; + + args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser, 1, 1, 0, argsbuf); + if (!args) { + goto exit; + } + if (PyObject_GetBuffer(args[0], &buffer, PyBUF_SIMPLE) != 0) { + goto exit; + } + if (!PyBuffer_IsContiguous(&buffer, 'C')) { + _PyArg_BadArgument("from_buffer", "argument 'buffer'", "contiguous buffer", args[0]); + goto exit; + } + if (!noptargs) { + goto skip_optional_kwonly; + } + if (args[1]) { + password = args[1]; + if (!--noptargs) { + goto skip_optional_kwonly; + } + } + format = _PyLong_AsInt(args[2]); + if (format == -1 && PyErr_Occurred()) { + goto exit; + } +skip_optional_kwonly: + return_value = _ssl_PrivateKey_from_buffer_impl(type, &buffer, password, format); + +exit: + /* Cleanup for buffer */ + if (buffer.obj) { + PyBuffer_Release(&buffer); + } + + return return_value; +} +/*[clinic end generated code: output=0af76c275a830b38 input=a9049054013a1b77]*/ diff --git a/Modules/_ssl/clinic/truststore.c.h b/Modules/_ssl/clinic/truststore.c.h new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/Modules/_ssl/misc.c b/Modules/_ssl/misc.c new file mode 100644 index 00000000000000..c19e87401cae97 --- /dev/null +++ b/Modules/_ssl/misc.c @@ -0,0 +1,83 @@ +#include "Python.h" +#include "../_ssl.h" + +#include "openssl/bio.h" + +/* Create BIO from object(converter="PyUnicode_FSConverter") */ +static BIO * +_PySSL_filebio(_sslmodulestate *state, PyObject *path) +{ + BIO *bio = NULL; + int result; + + if ((bio = BIO_new(BIO_s_file())) == NULL) { + _setSSLError(state, "Can't allocate BIO", 0, __FILE__, __LINE__); + return NULL; + } + PySSL_BEGIN_ALLOW_THREADS + result = BIO_read_filename(bio, PyBytes_AsString(path)); + PySSL_END_ALLOW_THREADS + + if (result <= 0) { + _setSSLError(state, "Cannot read file", 0, __FILE__, __LINE__); + BIO_free(bio); + return NULL; + } + return bio; +} + +/* Create BIO from Py_buffer + * + * NOTE: The BIO uses the buffer directly. You must BIO_free(bio) first, + * then PyBuffer_Release(&b)! + */ +static BIO * +_PySSL_bufferbio(_sslmodulestate *state, Py_buffer *b) +{ + BIO *bio = NULL; + + if (b->len > INT_MAX) { + PyErr_Format(PyExc_OverflowError, + "buffer longer than %d bytes", INT_MAX); + return NULL; + } + + bio = BIO_new_mem_buf(b->buf, (int)b->len); + if (bio == NULL) { + _setSSLError(state, "Cannot allocate BIO", 0, __FILE__, __LINE__); + return NULL; + } + return bio; +} + +/* BIO_s_mem() to PyBytes + */ +static PyObject * +_PySSL_BytesFromBIO(_sslmodulestate *state, BIO *bio) +{ + long size; + char *data = NULL; + size = BIO_get_mem_data(bio, &data); + /* Only memory BIO set pointer for BIO_CTRL_INFO */ + if (data == NULL || size < 0) { + PyErr_SetString(PyExc_ValueError, "Not a memory BIO"); + return NULL; + } + return PyBytes_FromStringAndSize(data, size); +} + +/* BIO_s_mem() to PyUnicode + */ +static PyObject * +_PySSL_UnicodeFromBIO(_sslmodulestate *state, BIO *bio, const char *error) +{ + long size; + char *data = NULL; + size = BIO_get_mem_data(bio, &data); + /* Only memory BIO set pointer for BIO_CTRL_INFO */ + if (data == NULL || size < 0) { + PyErr_SetString(PyExc_ValueError, "Not a memory BIO"); + return NULL; + } + return PyUnicode_DecodeUTF8(data, size, error); +} diff --git a/Modules/_ssl/pkey.c b/Modules/_ssl/pkey.c new file mode 100644 index 00000000000000..bb023ad3d2f998 --- /dev/null +++ b/Modules/_ssl/pkey.c @@ -0,0 +1,161 @@ +#include "Python.h" +#include "../_ssl.h" + +#include "openssl/bio.h" +#include "openssl/evp.h" +#include "openssl/pem.h" + +/*[clinic input] +module _ssl +class _ssl.PrivateKey "PySSLPrivateKey *" "PySSLPrivateKey_Type" +[clinic start generated code]*/ +/*[clinic end generated code: output=da39a3ee5e6b4b0d input=46196dd92bd0a6d8]*/ + +#include "clinic/pkey.c.h" + +static PyObject * +newPrivateKey(PyTypeObject *type, EVP_PKEY *pkey, int upref) +{ + PySSLPrivateKey *self; + + assert(type != NULL && type->tp_alloc != NULL); + assert(pkey != NULL); + + self = (PySSLPrivateKey *) type->tp_alloc(type, 0); + if (self == NULL) { + return NULL; + } + if (upref == 1) { + EVP_PKEY_up_ref(pkey); + } + self->pkey = pkey; + + return (PyObject *) self; +} + +static EVP_PKEY * +read_pkey_bio(BIO *bio, int format, PySSLPasswordInfo *pw_info) +{ + EVP_PKEY *pkey = NULL; + + switch(format) { + case PY_SSL_FILETYPE_PEM: + PySSL_BEGIN_ALLOW_THREADS_S(pw_info->thread_state); + pkey = PEM_read_bio_PrivateKey(bio, NULL, PySSL_password_cb, pw_info); + PySSL_END_ALLOW_THREADS_S(pw_info->thread_state); + break; + case PY_SSL_FILETYPE_ASN1: + PySSL_BEGIN_ALLOW_THREADS_S(pw_info->thread_state); + pkey = d2i_PKCS8PrivateKey_bio(bio, NULL, PySSL_password_cb, pw_info); + PySSL_END_ALLOW_THREADS_S(pw_info->thread_state); + break; + default: + PyErr_SetString(PyExc_ValueError, "Invalid format"); + return NULL; + } + + if (pkey == NULL) { + PYSSL_PWINFO_ERROR(pw_info) + return NULL; + } + return pkey; +} + +/*[clinic input] +@classmethod +_ssl.PrivateKey.from_file + path: object(converter="PyUnicode_FSConverter") + * + password: object = None + format: int(c_default="PY_SSL_FILETYPE_PEM") = FILETYPE_PEM + +[clinic start generated code]*/ + +static PyObject * +_ssl_PrivateKey_from_file_impl(PyTypeObject *type, PyObject *path, + PyObject *password, int format) +/*[clinic end generated code: output=5dc7bfeda73c8b4b input=1f0112f77dded55b]*/ +{ + EVP_PKEY *pkey = NULL; + BIO *bio; + PySSLPasswordInfo pw_info = { NULL, NULL, NULL, 0, 0 }; + + PYSSL_PWINFO_INIT(&pw_info, password, NULL) + + bio = _PySSL_filebio(path); + if (bio == NULL) { + return NULL; + } + pkey = read_pkey_bio(bio, format, &pw_info); + BIO_free(bio); + if (pkey == NULL) { + return NULL; + } + return newPrivateKey(type, pkey, 0); +} + +/*[clinic input] +@classmethod +_ssl.PrivateKey.from_buffer + buffer: Py_buffer + * + password: object = None + format: int(c_default="PY_SSL_FILETYPE_PEM") = FILETYPE_PEM +[clinic start generated code]*/ + +static PyObject * +_ssl_PrivateKey_from_buffer_impl(PyTypeObject *type, Py_buffer *buffer, + PyObject *password, int format) +/*[clinic end generated code: output=e6acef288f8eff17 input=a76a6549e5381124]*/ +{ + EVP_PKEY *pkey = NULL; + BIO *bio; + PySSLPasswordInfo pw_info = { NULL, NULL, NULL, 0, 0 }; + + PYSSL_PWINFO_INIT(&pw_info, password, NULL) + + bio = _PySSL_bufferbio(buffer); + if (bio == NULL) { + return NULL; + } + pkey = read_pkey_bio(bio, format, &pw_info); + BIO_free(bio); + if (pkey == NULL) { + return NULL; + } + return newPrivateKey(type, pkey, 0); +} + +static void +pkey_dealloc(PySSLPrivateKey *self) +{ + PyTypeObject *tp = Py_TYPE(self); + EVP_PKEY_free(self->pkey); + Py_TYPE(self)->tp_free(self); + Py_DECREF(tp); +} + +static PyGetSetDef pkey_getsetlist[] = { + {NULL} /* sentinel */ +}; + +static PyMethodDef pkey_methods[] = { + _SSL_PRIVATEKEY_FROM_FILE_METHODDEF + _SSL_PRIVATEKEY_FROM_BUFFER_METHODDEF + {NULL, NULL} +}; + +static PyType_Slot PySSLPrivateKey_slots[] = { + {Py_tp_dealloc, pkey_dealloc}, + {Py_tp_methods, pkey_methods}, + {Py_tp_getset, pkey_getsetlist}, + {0, 0}, +}; + +static PyType_Spec PySSLPrivateKey_spec = { + "_ssl.PrivateKey", + sizeof(PySSLPrivateKey), + 0, + Py_TPFLAGS_DEFAULT, + PySSLPrivateKey_slots, +}; diff --git a/Modules/_ssl/truststore.c b/Modules/_ssl/truststore.c new file mode 100644 index 00000000000000..1e283b2a7f4b49 --- /dev/null +++ b/Modules/_ssl/truststore.c @@ -0,0 +1,95 @@ +#include "Python.h" +#include "../_ssl.h" + +#include "openssl/x509_vfy.h" + + +/*[clinic input] +module _ssl +class _ssl.TrustStore "PySSLTrustStore *" "PySSLTrustStore_Type" +[clinic start generated code]*/ +/*[clinic end generated code: output=da39a3ee5e6b4b0d input=07c50a283a64cefa]*/ + +#include "clinic/truststore.c.h" + +static PyObject * +newTrustStore(PyTypeObject *type, X509_STORE *store, int upref) +{ + PySSLTrustStore *self; + + assert(type != NULL && type->tp_alloc != NULL); + assert(store != NULL); + + self = (PySSLTrustStore *) type->tp_alloc(type, 0); + if (self == NULL) { + return NULL; + } + if (upref == 1) { + X509_STORE_up_ref(store); + } + self->store = store; + self->hash_dirs = NULL; + + return (PyObject *) self; +} + +static PyObject * +_PySSL_TrustStoreFromStore(X509_STORE *store, int upref) +{ + return newTrustStore(PySSLTrustStore_Type, store, upref); +} + +static int +truststore_traverse(PySSLTrustStore *self, visitproc visit, void *arg) +{ + if (self->hash_dirs != NULL) { + Py_VISIT(self->hash_dirs); + } + return 0; +} + +static int +truststore_clear(PySSLTrustStore *self) +{ + if (self->hash_dirs != NULL) { + Py_CLEAR(self->hash_dirs); + } + return 0; +} + +static void +truststore_dealloc(PySSLTrustStore *self) +{ + PyTypeObject *tp = Py_TYPE(self); + PyObject_GC_UnTrack(self); + truststore_clear(self); + X509_STORE_free(self->store); + Py_TYPE(self)->tp_free(self); + Py_DECREF(tp); +} + +static PyGetSetDef truststore_getsetlist[] = { + {NULL} /* sentinel */ +}; + +static PyMethodDef truststore_methods[] = { + /* methods */ + {NULL, NULL} +}; + +static PyType_Slot truststore_slots[] = { + {Py_tp_dealloc, truststore_dealloc}, + {Py_tp_traverse, truststore_traverse}, + {Py_tp_clear, truststore_clear}, + {Py_tp_methods, truststore_methods}, + {Py_tp_getset, truststore_getsetlist}, + {0, 0}, +}; + +static PyType_Spec PySSLTrustStore_spec = { + "_ssl.TrustStore", + sizeof(PySSLTrustStore), + 0, + Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC, + truststore_slots, +}; diff --git a/Modules/clinic/_ssl.c.h b/Modules/clinic/_ssl.c.h index 45c3095a9f6f00..689229f9eb4917 100644 --- a/Modules/clinic/_ssl.c.h +++ b/Modules/clinic/_ssl.c.h @@ -88,6 +88,27 @@ _ssl__SSLSocket_getpeercert(PySSLSocket *self, PyObject *const *args, Py_ssize_t return return_value; } +#if (OPENSSL_VERSION_1_1) + +PyDoc_STRVAR(_ssl__SSLSocket_verified_chain__doc__, +"verified_chain($self, /)\n" +"--\n" +"\n"); + +#define _SSL__SSLSOCKET_VERIFIED_CHAIN_METHODDEF \ + {"verified_chain", (PyCFunction)_ssl__SSLSocket_verified_chain, METH_NOARGS, _ssl__SSLSocket_verified_chain__doc__}, + +static PyObject * +_ssl__SSLSocket_verified_chain_impl(PySSLSocket *self); + +static PyObject * +_ssl__SSLSocket_verified_chain(PySSLSocket *self, PyObject *Py_UNUSED(ignored)) +{ + return _ssl__SSLSocket_verified_chain_impl(self); +} + +#endif /* (OPENSSL_VERSION_1_1) */ + PyDoc_STRVAR(_ssl__SSLSocket_shared_ciphers__doc__, "shared_ciphers($self, /)\n" "--\n" @@ -1317,6 +1338,10 @@ _ssl_enum_crls(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObje #endif /* defined(_MSC_VER) */ +#ifndef _SSL__SSLSOCKET_VERIFIED_CHAIN_METHODDEF + #define _SSL__SSLSOCKET_VERIFIED_CHAIN_METHODDEF +#endif /* !defined(_SSL__SSLSOCKET_VERIFIED_CHAIN_METHODDEF) */ + #ifndef _SSL_ENUM_CERTIFICATES_METHODDEF #define _SSL_ENUM_CERTIFICATES_METHODDEF #endif /* !defined(_SSL_ENUM_CERTIFICATES_METHODDEF) */ @@ -1324,4 +1349,4 @@ _ssl_enum_crls(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObje #ifndef _SSL_ENUM_CRLS_METHODDEF #define _SSL_ENUM_CRLS_METHODDEF #endif /* !defined(_SSL_ENUM_CRLS_METHODDEF) */ -/*[clinic end generated code: output=83e68c77bd96789a input=a9049054013a1b77]*/ +/*[clinic end generated code: output=6ee33b5ee421372e input=a9049054013a1b77]*/ diff --git a/setup.py b/setup.py index af384409553eb8..f2dd7903da79d9 100644 --- a/setup.py +++ b/setup.py @@ -2461,7 +2461,15 @@ def split_var(name, sep): Extension( '_ssl', ['_ssl.c'], - depends=['socketmodule.h', '_ssl/debughelpers.c', '_ssl.h'], + depends=[ + 'socketmodule.h', + '_ssl.h', + '_ssl/debughelpers.c', + '_ssl/misc.c', + '_ssl/cert.c', + '_ssl/pkey.c', + '_ssl/truststore.c', + ], **openssl_extension_kwargs ) )