This commit is contained in:
2025-04-17 04:52:48 +08:00
commit 9985b73dc1
3708 changed files with 2387532 additions and 0 deletions

View File

@ -0,0 +1,430 @@
# type: ignore
"""
This module uses ctypes to bind a whole bunch of functions and constants from
SecureTransport. The goal here is to provide the low-level API to
SecureTransport. These are essentially the C-level functions and constants, and
they're pretty gross to work with.
This code is a bastardised version of the code found in Will Bond's oscrypto
library. An enormous debt is owed to him for blazing this trail for us. For
that reason, this code should be considered to be covered both by urllib3's
license and by oscrypto's:
Copyright (c) 2015-2016 Will Bond <will@wbond.net>
Permission is hereby granted, free of charge, to any person obtaining a
copy of this software and associated documentation files (the "Software"),
to deal in the Software without restriction, including without limitation
the rights to use, copy, modify, merge, publish, distribute, sublicense,
and/or sell copies of the Software, and to permit persons to whom the
Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
"""
from __future__ import annotations
import platform
from ctypes import (
CDLL,
CFUNCTYPE,
POINTER,
c_bool,
c_byte,
c_char_p,
c_int32,
c_long,
c_size_t,
c_uint32,
c_ulong,
c_void_p,
)
from ctypes.util import find_library
if platform.system() != "Darwin":
raise ImportError("Only macOS is supported")
version = platform.mac_ver()[0]
version_info = tuple(map(int, version.split(".")))
if version_info < (10, 8):
raise OSError(
f"Only OS X 10.8 and newer are supported, not {version_info[0]}.{version_info[1]}"
)
def load_cdll(name: str, macos10_16_path: str) -> CDLL:
"""Loads a CDLL by name, falling back to known path on 10.16+"""
try:
# Big Sur is technically 11 but we use 10.16 due to the Big Sur
# beta being labeled as 10.16.
path: str | None
if version_info >= (10, 16):
path = macos10_16_path
else:
path = find_library(name)
if not path:
raise OSError # Caught and reraised as 'ImportError'
return CDLL(path, use_errno=True)
except OSError:
raise ImportError(f"The library {name} failed to load") from None
Security = load_cdll(
"Security", "/System/Library/Frameworks/Security.framework/Security"
)
CoreFoundation = load_cdll(
"CoreFoundation",
"/System/Library/Frameworks/CoreFoundation.framework/CoreFoundation",
)
Boolean = c_bool
CFIndex = c_long
CFStringEncoding = c_uint32
CFData = c_void_p
CFString = c_void_p
CFArray = c_void_p
CFMutableArray = c_void_p
CFDictionary = c_void_p
CFError = c_void_p
CFType = c_void_p
CFTypeID = c_ulong
CFTypeRef = POINTER(CFType)
CFAllocatorRef = c_void_p
OSStatus = c_int32
CFDataRef = POINTER(CFData)
CFStringRef = POINTER(CFString)
CFArrayRef = POINTER(CFArray)
CFMutableArrayRef = POINTER(CFMutableArray)
CFDictionaryRef = POINTER(CFDictionary)
CFArrayCallBacks = c_void_p
CFDictionaryKeyCallBacks = c_void_p
CFDictionaryValueCallBacks = c_void_p
SecCertificateRef = POINTER(c_void_p)
SecExternalFormat = c_uint32
SecExternalItemType = c_uint32
SecIdentityRef = POINTER(c_void_p)
SecItemImportExportFlags = c_uint32
SecItemImportExportKeyParameters = c_void_p
SecKeychainRef = POINTER(c_void_p)
SSLProtocol = c_uint32
SSLCipherSuite = c_uint32
SSLContextRef = POINTER(c_void_p)
SecTrustRef = POINTER(c_void_p)
SSLConnectionRef = c_uint32
SecTrustResultType = c_uint32
SecTrustOptionFlags = c_uint32
SSLProtocolSide = c_uint32
SSLConnectionType = c_uint32
SSLSessionOption = c_uint32
try:
Security.SecItemImport.argtypes = [
CFDataRef,
CFStringRef,
POINTER(SecExternalFormat),
POINTER(SecExternalItemType),
SecItemImportExportFlags,
POINTER(SecItemImportExportKeyParameters),
SecKeychainRef,
POINTER(CFArrayRef),
]
Security.SecItemImport.restype = OSStatus
Security.SecCertificateGetTypeID.argtypes = []
Security.SecCertificateGetTypeID.restype = CFTypeID
Security.SecIdentityGetTypeID.argtypes = []
Security.SecIdentityGetTypeID.restype = CFTypeID
Security.SecKeyGetTypeID.argtypes = []
Security.SecKeyGetTypeID.restype = CFTypeID
Security.SecCertificateCreateWithData.argtypes = [CFAllocatorRef, CFDataRef]
Security.SecCertificateCreateWithData.restype = SecCertificateRef
Security.SecCertificateCopyData.argtypes = [SecCertificateRef]
Security.SecCertificateCopyData.restype = CFDataRef
Security.SecCopyErrorMessageString.argtypes = [OSStatus, c_void_p]
Security.SecCopyErrorMessageString.restype = CFStringRef
Security.SecIdentityCreateWithCertificate.argtypes = [
CFTypeRef,
SecCertificateRef,
POINTER(SecIdentityRef),
]
Security.SecIdentityCreateWithCertificate.restype = OSStatus
Security.SecKeychainCreate.argtypes = [
c_char_p,
c_uint32,
c_void_p,
Boolean,
c_void_p,
POINTER(SecKeychainRef),
]
Security.SecKeychainCreate.restype = OSStatus
Security.SecKeychainDelete.argtypes = [SecKeychainRef]
Security.SecKeychainDelete.restype = OSStatus
Security.SecPKCS12Import.argtypes = [
CFDataRef,
CFDictionaryRef,
POINTER(CFArrayRef),
]
Security.SecPKCS12Import.restype = OSStatus
SSLReadFunc = CFUNCTYPE(OSStatus, SSLConnectionRef, c_void_p, POINTER(c_size_t))
SSLWriteFunc = CFUNCTYPE(
OSStatus, SSLConnectionRef, POINTER(c_byte), POINTER(c_size_t)
)
Security.SSLSetIOFuncs.argtypes = [SSLContextRef, SSLReadFunc, SSLWriteFunc]
Security.SSLSetIOFuncs.restype = OSStatus
Security.SSLSetPeerID.argtypes = [SSLContextRef, c_char_p, c_size_t]
Security.SSLSetPeerID.restype = OSStatus
Security.SSLSetCertificate.argtypes = [SSLContextRef, CFArrayRef]
Security.SSLSetCertificate.restype = OSStatus
Security.SSLSetCertificateAuthorities.argtypes = [SSLContextRef, CFTypeRef, Boolean]
Security.SSLSetCertificateAuthorities.restype = OSStatus
Security.SSLSetConnection.argtypes = [SSLContextRef, SSLConnectionRef]
Security.SSLSetConnection.restype = OSStatus
Security.SSLSetPeerDomainName.argtypes = [SSLContextRef, c_char_p, c_size_t]
Security.SSLSetPeerDomainName.restype = OSStatus
Security.SSLHandshake.argtypes = [SSLContextRef]
Security.SSLHandshake.restype = OSStatus
Security.SSLRead.argtypes = [SSLContextRef, c_char_p, c_size_t, POINTER(c_size_t)]
Security.SSLRead.restype = OSStatus
Security.SSLWrite.argtypes = [SSLContextRef, c_char_p, c_size_t, POINTER(c_size_t)]
Security.SSLWrite.restype = OSStatus
Security.SSLClose.argtypes = [SSLContextRef]
Security.SSLClose.restype = OSStatus
Security.SSLGetNumberSupportedCiphers.argtypes = [SSLContextRef, POINTER(c_size_t)]
Security.SSLGetNumberSupportedCiphers.restype = OSStatus
Security.SSLGetSupportedCiphers.argtypes = [
SSLContextRef,
POINTER(SSLCipherSuite),
POINTER(c_size_t),
]
Security.SSLGetSupportedCiphers.restype = OSStatus
Security.SSLSetEnabledCiphers.argtypes = [
SSLContextRef,
POINTER(SSLCipherSuite),
c_size_t,
]
Security.SSLSetEnabledCiphers.restype = OSStatus
Security.SSLGetNumberEnabledCiphers.argtype = [SSLContextRef, POINTER(c_size_t)]
Security.SSLGetNumberEnabledCiphers.restype = OSStatus
Security.SSLGetEnabledCiphers.argtypes = [
SSLContextRef,
POINTER(SSLCipherSuite),
POINTER(c_size_t),
]
Security.SSLGetEnabledCiphers.restype = OSStatus
Security.SSLGetNegotiatedCipher.argtypes = [SSLContextRef, POINTER(SSLCipherSuite)]
Security.SSLGetNegotiatedCipher.restype = OSStatus
Security.SSLGetNegotiatedProtocolVersion.argtypes = [
SSLContextRef,
POINTER(SSLProtocol),
]
Security.SSLGetNegotiatedProtocolVersion.restype = OSStatus
Security.SSLCopyPeerTrust.argtypes = [SSLContextRef, POINTER(SecTrustRef)]
Security.SSLCopyPeerTrust.restype = OSStatus
Security.SecTrustSetAnchorCertificates.argtypes = [SecTrustRef, CFArrayRef]
Security.SecTrustSetAnchorCertificates.restype = OSStatus
Security.SecTrustSetAnchorCertificatesOnly.argstypes = [SecTrustRef, Boolean]
Security.SecTrustSetAnchorCertificatesOnly.restype = OSStatus
Security.SecTrustEvaluate.argtypes = [SecTrustRef, POINTER(SecTrustResultType)]
Security.SecTrustEvaluate.restype = OSStatus
Security.SecTrustGetCertificateCount.argtypes = [SecTrustRef]
Security.SecTrustGetCertificateCount.restype = CFIndex
Security.SecTrustGetCertificateAtIndex.argtypes = [SecTrustRef, CFIndex]
Security.SecTrustGetCertificateAtIndex.restype = SecCertificateRef
Security.SSLCreateContext.argtypes = [
CFAllocatorRef,
SSLProtocolSide,
SSLConnectionType,
]
Security.SSLCreateContext.restype = SSLContextRef
Security.SSLSetSessionOption.argtypes = [SSLContextRef, SSLSessionOption, Boolean]
Security.SSLSetSessionOption.restype = OSStatus
Security.SSLSetProtocolVersionMin.argtypes = [SSLContextRef, SSLProtocol]
Security.SSLSetProtocolVersionMin.restype = OSStatus
Security.SSLSetProtocolVersionMax.argtypes = [SSLContextRef, SSLProtocol]
Security.SSLSetProtocolVersionMax.restype = OSStatus
try:
Security.SSLSetALPNProtocols.argtypes = [SSLContextRef, CFArrayRef]
Security.SSLSetALPNProtocols.restype = OSStatus
except AttributeError:
# Supported only in 10.12+
pass
Security.SecCopyErrorMessageString.argtypes = [OSStatus, c_void_p]
Security.SecCopyErrorMessageString.restype = CFStringRef
Security.SSLReadFunc = SSLReadFunc
Security.SSLWriteFunc = SSLWriteFunc
Security.SSLContextRef = SSLContextRef
Security.SSLProtocol = SSLProtocol
Security.SSLCipherSuite = SSLCipherSuite
Security.SecIdentityRef = SecIdentityRef
Security.SecKeychainRef = SecKeychainRef
Security.SecTrustRef = SecTrustRef
Security.SecTrustResultType = SecTrustResultType
Security.SecExternalFormat = SecExternalFormat
Security.OSStatus = OSStatus
Security.kSecImportExportPassphrase = CFStringRef.in_dll(
Security, "kSecImportExportPassphrase"
)
Security.kSecImportItemIdentity = CFStringRef.in_dll(
Security, "kSecImportItemIdentity"
)
# CoreFoundation time!
CoreFoundation.CFRetain.argtypes = [CFTypeRef]
CoreFoundation.CFRetain.restype = CFTypeRef
CoreFoundation.CFRelease.argtypes = [CFTypeRef]
CoreFoundation.CFRelease.restype = None
CoreFoundation.CFGetTypeID.argtypes = [CFTypeRef]
CoreFoundation.CFGetTypeID.restype = CFTypeID
CoreFoundation.CFStringCreateWithCString.argtypes = [
CFAllocatorRef,
c_char_p,
CFStringEncoding,
]
CoreFoundation.CFStringCreateWithCString.restype = CFStringRef
CoreFoundation.CFStringGetCStringPtr.argtypes = [CFStringRef, CFStringEncoding]
CoreFoundation.CFStringGetCStringPtr.restype = c_char_p
CoreFoundation.CFStringGetCString.argtypes = [
CFStringRef,
c_char_p,
CFIndex,
CFStringEncoding,
]
CoreFoundation.CFStringGetCString.restype = c_bool
CoreFoundation.CFDataCreate.argtypes = [CFAllocatorRef, c_char_p, CFIndex]
CoreFoundation.CFDataCreate.restype = CFDataRef
CoreFoundation.CFDataGetLength.argtypes = [CFDataRef]
CoreFoundation.CFDataGetLength.restype = CFIndex
CoreFoundation.CFDataGetBytePtr.argtypes = [CFDataRef]
CoreFoundation.CFDataGetBytePtr.restype = c_void_p
CoreFoundation.CFDictionaryCreate.argtypes = [
CFAllocatorRef,
POINTER(CFTypeRef),
POINTER(CFTypeRef),
CFIndex,
CFDictionaryKeyCallBacks,
CFDictionaryValueCallBacks,
]
CoreFoundation.CFDictionaryCreate.restype = CFDictionaryRef
CoreFoundation.CFDictionaryGetValue.argtypes = [CFDictionaryRef, CFTypeRef]
CoreFoundation.CFDictionaryGetValue.restype = CFTypeRef
CoreFoundation.CFArrayCreate.argtypes = [
CFAllocatorRef,
POINTER(CFTypeRef),
CFIndex,
CFArrayCallBacks,
]
CoreFoundation.CFArrayCreate.restype = CFArrayRef
CoreFoundation.CFArrayCreateMutable.argtypes = [
CFAllocatorRef,
CFIndex,
CFArrayCallBacks,
]
CoreFoundation.CFArrayCreateMutable.restype = CFMutableArrayRef
CoreFoundation.CFArrayAppendValue.argtypes = [CFMutableArrayRef, c_void_p]
CoreFoundation.CFArrayAppendValue.restype = None
CoreFoundation.CFArrayGetCount.argtypes = [CFArrayRef]
CoreFoundation.CFArrayGetCount.restype = CFIndex
CoreFoundation.CFArrayGetValueAtIndex.argtypes = [CFArrayRef, CFIndex]
CoreFoundation.CFArrayGetValueAtIndex.restype = c_void_p
CoreFoundation.kCFAllocatorDefault = CFAllocatorRef.in_dll(
CoreFoundation, "kCFAllocatorDefault"
)
CoreFoundation.kCFTypeArrayCallBacks = c_void_p.in_dll(
CoreFoundation, "kCFTypeArrayCallBacks"
)
CoreFoundation.kCFTypeDictionaryKeyCallBacks = c_void_p.in_dll(
CoreFoundation, "kCFTypeDictionaryKeyCallBacks"
)
CoreFoundation.kCFTypeDictionaryValueCallBacks = c_void_p.in_dll(
CoreFoundation, "kCFTypeDictionaryValueCallBacks"
)
CoreFoundation.CFTypeRef = CFTypeRef
CoreFoundation.CFArrayRef = CFArrayRef
CoreFoundation.CFStringRef = CFStringRef
CoreFoundation.CFDictionaryRef = CFDictionaryRef
except AttributeError:
raise ImportError("Error initializing ctypes") from None
class CFConst:
"""
A class object that acts as essentially a namespace for CoreFoundation
constants.
"""
kCFStringEncodingUTF8 = CFStringEncoding(0x08000100)

View File

@ -0,0 +1,474 @@
"""
Low-level helpers for the SecureTransport bindings.
These are Python functions that are not directly related to the high-level APIs
but are necessary to get them to work. They include a whole bunch of low-level
CoreFoundation messing about and memory management. The concerns in this module
are almost entirely about trying to avoid memory leaks and providing
appropriate and useful assistance to the higher-level code.
"""
from __future__ import annotations
import base64
import ctypes
import itertools
import os
import re
import ssl
import struct
import tempfile
import typing
from .bindings import ( # type: ignore[attr-defined]
CFArray,
CFConst,
CFData,
CFDictionary,
CFMutableArray,
CFString,
CFTypeRef,
CoreFoundation,
SecKeychainRef,
Security,
)
# This regular expression is used to grab PEM data out of a PEM bundle.
_PEM_CERTS_RE = re.compile(
b"-----BEGIN CERTIFICATE-----\n(.*?)\n-----END CERTIFICATE-----", re.DOTALL
)
def _cf_data_from_bytes(bytestring: bytes) -> CFData:
"""
Given a bytestring, create a CFData object from it. This CFData object must
be CFReleased by the caller.
"""
return CoreFoundation.CFDataCreate(
CoreFoundation.kCFAllocatorDefault, bytestring, len(bytestring)
)
def _cf_dictionary_from_tuples(
tuples: list[tuple[typing.Any, typing.Any]]
) -> CFDictionary:
"""
Given a list of Python tuples, create an associated CFDictionary.
"""
dictionary_size = len(tuples)
# We need to get the dictionary keys and values out in the same order.
keys = (t[0] for t in tuples)
values = (t[1] for t in tuples)
cf_keys = (CoreFoundation.CFTypeRef * dictionary_size)(*keys)
cf_values = (CoreFoundation.CFTypeRef * dictionary_size)(*values)
return CoreFoundation.CFDictionaryCreate(
CoreFoundation.kCFAllocatorDefault,
cf_keys,
cf_values,
dictionary_size,
CoreFoundation.kCFTypeDictionaryKeyCallBacks,
CoreFoundation.kCFTypeDictionaryValueCallBacks,
)
def _cfstr(py_bstr: bytes) -> CFString:
"""
Given a Python binary data, create a CFString.
The string must be CFReleased by the caller.
"""
c_str = ctypes.c_char_p(py_bstr)
cf_str = CoreFoundation.CFStringCreateWithCString(
CoreFoundation.kCFAllocatorDefault,
c_str,
CFConst.kCFStringEncodingUTF8,
)
return cf_str
def _create_cfstring_array(lst: list[bytes]) -> CFMutableArray:
"""
Given a list of Python binary data, create an associated CFMutableArray.
The array must be CFReleased by the caller.
Raises an ssl.SSLError on failure.
"""
cf_arr = None
try:
cf_arr = CoreFoundation.CFArrayCreateMutable(
CoreFoundation.kCFAllocatorDefault,
0,
ctypes.byref(CoreFoundation.kCFTypeArrayCallBacks),
)
if not cf_arr:
raise MemoryError("Unable to allocate memory!")
for item in lst:
cf_str = _cfstr(item)
if not cf_str:
raise MemoryError("Unable to allocate memory!")
try:
CoreFoundation.CFArrayAppendValue(cf_arr, cf_str)
finally:
CoreFoundation.CFRelease(cf_str)
except BaseException as e:
if cf_arr:
CoreFoundation.CFRelease(cf_arr)
raise ssl.SSLError(f"Unable to allocate array: {e}") from None
return cf_arr
def _cf_string_to_unicode(value: CFString) -> str | None:
"""
Creates a Unicode string from a CFString object. Used entirely for error
reporting.
Yes, it annoys me quite a lot that this function is this complex.
"""
value_as_void_p = ctypes.cast(value, ctypes.POINTER(ctypes.c_void_p))
string = CoreFoundation.CFStringGetCStringPtr(
value_as_void_p, CFConst.kCFStringEncodingUTF8
)
if string is None:
buffer = ctypes.create_string_buffer(1024)
result = CoreFoundation.CFStringGetCString(
value_as_void_p, buffer, 1024, CFConst.kCFStringEncodingUTF8
)
if not result:
raise OSError("Error copying C string from CFStringRef")
string = buffer.value
if string is not None:
string = string.decode("utf-8")
return string # type: ignore[no-any-return]
def _assert_no_error(
error: int, exception_class: type[BaseException] | None = None
) -> None:
"""
Checks the return code and throws an exception if there is an error to
report
"""
if error == 0:
return
cf_error_string = Security.SecCopyErrorMessageString(error, None)
output = _cf_string_to_unicode(cf_error_string)
CoreFoundation.CFRelease(cf_error_string)
if output is None or output == "":
output = f"OSStatus {error}"
if exception_class is None:
exception_class = ssl.SSLError
raise exception_class(output)
def _cert_array_from_pem(pem_bundle: bytes) -> CFArray:
"""
Given a bundle of certs in PEM format, turns them into a CFArray of certs
that can be used to validate a cert chain.
"""
# Normalize the PEM bundle's line endings.
pem_bundle = pem_bundle.replace(b"\r\n", b"\n")
der_certs = [
base64.b64decode(match.group(1)) for match in _PEM_CERTS_RE.finditer(pem_bundle)
]
if not der_certs:
raise ssl.SSLError("No root certificates specified")
cert_array = CoreFoundation.CFArrayCreateMutable(
CoreFoundation.kCFAllocatorDefault,
0,
ctypes.byref(CoreFoundation.kCFTypeArrayCallBacks),
)
if not cert_array:
raise ssl.SSLError("Unable to allocate memory!")
try:
for der_bytes in der_certs:
certdata = _cf_data_from_bytes(der_bytes)
if not certdata:
raise ssl.SSLError("Unable to allocate memory!")
cert = Security.SecCertificateCreateWithData(
CoreFoundation.kCFAllocatorDefault, certdata
)
CoreFoundation.CFRelease(certdata)
if not cert:
raise ssl.SSLError("Unable to build cert object!")
CoreFoundation.CFArrayAppendValue(cert_array, cert)
CoreFoundation.CFRelease(cert)
except Exception:
# We need to free the array before the exception bubbles further.
# We only want to do that if an error occurs: otherwise, the caller
# should free.
CoreFoundation.CFRelease(cert_array)
raise
return cert_array
def _is_cert(item: CFTypeRef) -> bool:
"""
Returns True if a given CFTypeRef is a certificate.
"""
expected = Security.SecCertificateGetTypeID()
return CoreFoundation.CFGetTypeID(item) == expected # type: ignore[no-any-return]
def _is_identity(item: CFTypeRef) -> bool:
"""
Returns True if a given CFTypeRef is an identity.
"""
expected = Security.SecIdentityGetTypeID()
return CoreFoundation.CFGetTypeID(item) == expected # type: ignore[no-any-return]
def _temporary_keychain() -> tuple[SecKeychainRef, str]:
"""
This function creates a temporary Mac keychain that we can use to work with
credentials. This keychain uses a one-time password and a temporary file to
store the data. We expect to have one keychain per socket. The returned
SecKeychainRef must be freed by the caller, including calling
SecKeychainDelete.
Returns a tuple of the SecKeychainRef and the path to the temporary
directory that contains it.
"""
# Unfortunately, SecKeychainCreate requires a path to a keychain. This
# means we cannot use mkstemp to use a generic temporary file. Instead,
# we're going to create a temporary directory and a filename to use there.
# This filename will be 8 random bytes expanded into base64. We also need
# some random bytes to password-protect the keychain we're creating, so we
# ask for 40 random bytes.
random_bytes = os.urandom(40)
filename = base64.b16encode(random_bytes[:8]).decode("utf-8")
password = base64.b16encode(random_bytes[8:]) # Must be valid UTF-8
tempdirectory = tempfile.mkdtemp()
keychain_path = os.path.join(tempdirectory, filename).encode("utf-8")
# We now want to create the keychain itself.
keychain = Security.SecKeychainRef()
status = Security.SecKeychainCreate(
keychain_path, len(password), password, False, None, ctypes.byref(keychain)
)
_assert_no_error(status)
# Having created the keychain, we want to pass it off to the caller.
return keychain, tempdirectory
def _load_items_from_file(
keychain: SecKeychainRef, path: str
) -> tuple[list[CFTypeRef], list[CFTypeRef]]:
"""
Given a single file, loads all the trust objects from it into arrays and
the keychain.
Returns a tuple of lists: the first list is a list of identities, the
second a list of certs.
"""
certificates = []
identities = []
result_array = None
with open(path, "rb") as f:
raw_filedata = f.read()
try:
filedata = CoreFoundation.CFDataCreate(
CoreFoundation.kCFAllocatorDefault, raw_filedata, len(raw_filedata)
)
result_array = CoreFoundation.CFArrayRef()
result = Security.SecItemImport(
filedata, # cert data
None, # Filename, leaving it out for now
None, # What the type of the file is, we don't care
None, # what's in the file, we don't care
0, # import flags
None, # key params, can include passphrase in the future
keychain, # The keychain to insert into
ctypes.byref(result_array), # Results
)
_assert_no_error(result)
# A CFArray is not very useful to us as an intermediary
# representation, so we are going to extract the objects we want
# and then free the array. We don't need to keep hold of keys: the
# keychain already has them!
result_count = CoreFoundation.CFArrayGetCount(result_array)
for index in range(result_count):
item = CoreFoundation.CFArrayGetValueAtIndex(result_array, index)
item = ctypes.cast(item, CoreFoundation.CFTypeRef)
if _is_cert(item):
CoreFoundation.CFRetain(item)
certificates.append(item)
elif _is_identity(item):
CoreFoundation.CFRetain(item)
identities.append(item)
finally:
if result_array:
CoreFoundation.CFRelease(result_array)
CoreFoundation.CFRelease(filedata)
return (identities, certificates)
def _load_client_cert_chain(keychain: SecKeychainRef, *paths: str | None) -> CFArray:
"""
Load certificates and maybe keys from a number of files. Has the end goal
of returning a CFArray containing one SecIdentityRef, and then zero or more
SecCertificateRef objects, suitable for use as a client certificate trust
chain.
"""
# Ok, the strategy.
#
# This relies on knowing that macOS will not give you a SecIdentityRef
# unless you have imported a key into a keychain. This is a somewhat
# artificial limitation of macOS (for example, it doesn't necessarily
# affect iOS), but there is nothing inside Security.framework that lets you
# get a SecIdentityRef without having a key in a keychain.
#
# So the policy here is we take all the files and iterate them in order.
# Each one will use SecItemImport to have one or more objects loaded from
# it. We will also point at a keychain that macOS can use to work with the
# private key.
#
# Once we have all the objects, we'll check what we actually have. If we
# already have a SecIdentityRef in hand, fab: we'll use that. Otherwise,
# we'll take the first certificate (which we assume to be our leaf) and
# ask the keychain to give us a SecIdentityRef with that cert's associated
# key.
#
# We'll then return a CFArray containing the trust chain: one
# SecIdentityRef and then zero-or-more SecCertificateRef objects. The
# responsibility for freeing this CFArray will be with the caller. This
# CFArray must remain alive for the entire connection, so in practice it
# will be stored with a single SSLSocket, along with the reference to the
# keychain.
certificates = []
identities = []
# Filter out bad paths.
filtered_paths = (path for path in paths if path)
try:
for file_path in filtered_paths:
new_identities, new_certs = _load_items_from_file(keychain, file_path)
identities.extend(new_identities)
certificates.extend(new_certs)
# Ok, we have everything. The question is: do we have an identity? If
# not, we want to grab one from the first cert we have.
if not identities:
new_identity = Security.SecIdentityRef()
status = Security.SecIdentityCreateWithCertificate(
keychain, certificates[0], ctypes.byref(new_identity)
)
_assert_no_error(status)
identities.append(new_identity)
# We now want to release the original certificate, as we no longer
# need it.
CoreFoundation.CFRelease(certificates.pop(0))
# We now need to build a new CFArray that holds the trust chain.
trust_chain = CoreFoundation.CFArrayCreateMutable(
CoreFoundation.kCFAllocatorDefault,
0,
ctypes.byref(CoreFoundation.kCFTypeArrayCallBacks),
)
for item in itertools.chain(identities, certificates):
# ArrayAppendValue does a CFRetain on the item. That's fine,
# because the finally block will release our other refs to them.
CoreFoundation.CFArrayAppendValue(trust_chain, item)
return trust_chain
finally:
for obj in itertools.chain(identities, certificates):
CoreFoundation.CFRelease(obj)
TLS_PROTOCOL_VERSIONS = {
"SSLv2": (0, 2),
"SSLv3": (3, 0),
"TLSv1": (3, 1),
"TLSv1.1": (3, 2),
"TLSv1.2": (3, 3),
}
def _build_tls_unknown_ca_alert(version: str) -> bytes:
"""
Builds a TLS alert record for an unknown CA.
"""
ver_maj, ver_min = TLS_PROTOCOL_VERSIONS[version]
severity_fatal = 0x02
description_unknown_ca = 0x30
msg = struct.pack(">BB", severity_fatal, description_unknown_ca)
msg_len = len(msg)
record_type_alert = 0x15
record = struct.pack(">BBBH", record_type_alert, ver_maj, ver_min, msg_len) + msg
return record
class SecurityConst:
"""
A class object that acts as essentially a namespace for Security constants.
"""
kSSLSessionOptionBreakOnServerAuth = 0
kSSLProtocol2 = 1
kSSLProtocol3 = 2
kTLSProtocol1 = 4
kTLSProtocol11 = 7
kTLSProtocol12 = 8
# SecureTransport does not support TLS 1.3 even if there's a constant for it
kTLSProtocol13 = 10
kTLSProtocolMaxSupported = 999
kSSLClientSide = 1
kSSLStreamType = 0
kSecFormatPEMSequence = 10
kSecTrustResultInvalid = 0
kSecTrustResultProceed = 1
# This gap is present on purpose: this was kSecTrustResultConfirm, which
# is deprecated.
kSecTrustResultDeny = 3
kSecTrustResultUnspecified = 4
kSecTrustResultRecoverableTrustFailure = 5
kSecTrustResultFatalTrustFailure = 6
kSecTrustResultOtherError = 7
errSSLProtocol = -9800
errSSLWouldBlock = -9803
errSSLClosedGraceful = -9805
errSSLClosedNoNotify = -9816
errSSLClosedAbort = -9806
errSSLXCertChainInvalid = -9807
errSSLCrypto = -9809
errSSLInternal = -9810
errSSLCertExpired = -9814
errSSLCertNotYetValid = -9815
errSSLUnknownRootCert = -9812
errSSLNoRootCert = -9813
errSSLHostNameMismatch = -9843
errSSLPeerHandshakeFail = -9824
errSSLPeerUserCancelled = -9839
errSSLWeakPeerEphemeralDHKey = -9850
errSSLServerAuthCompleted = -9841
errSSLRecordOverflow = -9847
errSecVerifyFailed = -67808
errSecNoTrustSettings = -25263
errSecItemNotFound = -25300
errSecInvalidTrustSettings = -25262

View File

@ -0,0 +1,548 @@
"""
Module for using pyOpenSSL as a TLS backend. This module was relevant before
the standard library ``ssl`` module supported SNI, but now that we've dropped
support for Python 2.7 all relevant Python versions support SNI so
**this module is no longer recommended**.
This needs the following packages installed:
* `pyOpenSSL`_ (tested with 16.0.0)
* `cryptography`_ (minimum 1.3.4, from pyopenssl)
* `idna`_ (minimum 2.0, from cryptography)
However, pyOpenSSL depends on cryptography, which depends on idna, so while we
use all three directly here we end up having relatively few packages required.
You can install them with the following command:
.. code-block:: bash
$ python -m pip install pyopenssl cryptography idna
To activate certificate checking, call
:func:`~urllib3.contrib.pyopenssl.inject_into_urllib3` from your Python code
before you begin making HTTP requests. This can be done in a ``sitecustomize``
module, or at any other time before your application begins using ``urllib3``,
like this:
.. code-block:: python
try:
import urllib3.contrib.pyopenssl
urllib3.contrib.pyopenssl.inject_into_urllib3()
except ImportError:
pass
.. _pyopenssl: https://www.pyopenssl.org
.. _cryptography: https://cryptography.io
.. _idna: https://github.com/kjd/idna
"""
from __future__ import annotations
import OpenSSL.SSL # type: ignore[import]
from cryptography import x509
try:
from cryptography.x509 import UnsupportedExtension # type: ignore[attr-defined]
except ImportError:
# UnsupportedExtension is gone in cryptography >= 2.1.0
class UnsupportedExtension(Exception): # type: ignore[no-redef]
pass
import logging
import ssl
import typing
from io import BytesIO
from socket import socket as socket_cls
from socket import timeout
from .. import util
if typing.TYPE_CHECKING:
from OpenSSL.crypto import X509 # type: ignore[import]
__all__ = ["inject_into_urllib3", "extract_from_urllib3"]
# Map from urllib3 to PyOpenSSL compatible parameter-values.
_openssl_versions = {
util.ssl_.PROTOCOL_TLS: OpenSSL.SSL.SSLv23_METHOD, # type: ignore[attr-defined]
util.ssl_.PROTOCOL_TLS_CLIENT: OpenSSL.SSL.SSLv23_METHOD, # type: ignore[attr-defined]
ssl.PROTOCOL_TLSv1: OpenSSL.SSL.TLSv1_METHOD,
}
if hasattr(ssl, "PROTOCOL_TLSv1_1") and hasattr(OpenSSL.SSL, "TLSv1_1_METHOD"):
_openssl_versions[ssl.PROTOCOL_TLSv1_1] = OpenSSL.SSL.TLSv1_1_METHOD
if hasattr(ssl, "PROTOCOL_TLSv1_2") and hasattr(OpenSSL.SSL, "TLSv1_2_METHOD"):
_openssl_versions[ssl.PROTOCOL_TLSv1_2] = OpenSSL.SSL.TLSv1_2_METHOD
_stdlib_to_openssl_verify = {
ssl.CERT_NONE: OpenSSL.SSL.VERIFY_NONE,
ssl.CERT_OPTIONAL: OpenSSL.SSL.VERIFY_PEER,
ssl.CERT_REQUIRED: OpenSSL.SSL.VERIFY_PEER
+ OpenSSL.SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
}
_openssl_to_stdlib_verify = {v: k for k, v in _stdlib_to_openssl_verify.items()}
# The SSLvX values are the most likely to be missing in the future
# but we check them all just to be sure.
_OP_NO_SSLv2_OR_SSLv3: int = getattr(OpenSSL.SSL, "OP_NO_SSLv2", 0) | getattr(
OpenSSL.SSL, "OP_NO_SSLv3", 0
)
_OP_NO_TLSv1: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1", 0)
_OP_NO_TLSv1_1: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1_1", 0)
_OP_NO_TLSv1_2: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1_2", 0)
_OP_NO_TLSv1_3: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1_3", 0)
_openssl_to_ssl_minimum_version: dict[int, int] = {
ssl.TLSVersion.MINIMUM_SUPPORTED: _OP_NO_SSLv2_OR_SSLv3,
ssl.TLSVersion.TLSv1: _OP_NO_SSLv2_OR_SSLv3,
ssl.TLSVersion.TLSv1_1: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1,
ssl.TLSVersion.TLSv1_2: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1 | _OP_NO_TLSv1_1,
ssl.TLSVersion.TLSv1_3: (
_OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1 | _OP_NO_TLSv1_1 | _OP_NO_TLSv1_2
),
ssl.TLSVersion.MAXIMUM_SUPPORTED: (
_OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1 | _OP_NO_TLSv1_1 | _OP_NO_TLSv1_2
),
}
_openssl_to_ssl_maximum_version: dict[int, int] = {
ssl.TLSVersion.MINIMUM_SUPPORTED: (
_OP_NO_SSLv2_OR_SSLv3
| _OP_NO_TLSv1
| _OP_NO_TLSv1_1
| _OP_NO_TLSv1_2
| _OP_NO_TLSv1_3
),
ssl.TLSVersion.TLSv1: (
_OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1_1 | _OP_NO_TLSv1_2 | _OP_NO_TLSv1_3
),
ssl.TLSVersion.TLSv1_1: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1_2 | _OP_NO_TLSv1_3,
ssl.TLSVersion.TLSv1_2: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1_3,
ssl.TLSVersion.TLSv1_3: _OP_NO_SSLv2_OR_SSLv3,
ssl.TLSVersion.MAXIMUM_SUPPORTED: _OP_NO_SSLv2_OR_SSLv3,
}
# OpenSSL will only write 16K at a time
SSL_WRITE_BLOCKSIZE = 16384
orig_util_SSLContext = util.ssl_.SSLContext
log = logging.getLogger(__name__)
def inject_into_urllib3() -> None:
"Monkey-patch urllib3 with PyOpenSSL-backed SSL-support."
_validate_dependencies_met()
util.SSLContext = PyOpenSSLContext # type: ignore[assignment]
util.ssl_.SSLContext = PyOpenSSLContext # type: ignore[assignment]
util.IS_PYOPENSSL = True
util.ssl_.IS_PYOPENSSL = True
def extract_from_urllib3() -> None:
"Undo monkey-patching by :func:`inject_into_urllib3`."
util.SSLContext = orig_util_SSLContext
util.ssl_.SSLContext = orig_util_SSLContext
util.IS_PYOPENSSL = False
util.ssl_.IS_PYOPENSSL = False
def _validate_dependencies_met() -> None:
"""
Verifies that PyOpenSSL's package-level dependencies have been met.
Throws `ImportError` if they are not met.
"""
# Method added in `cryptography==1.1`; not available in older versions
from cryptography.x509.extensions import Extensions
if getattr(Extensions, "get_extension_for_class", None) is None:
raise ImportError(
"'cryptography' module missing required functionality. "
"Try upgrading to v1.3.4 or newer."
)
# pyOpenSSL 0.14 and above use cryptography for OpenSSL bindings. The _x509
# attribute is only present on those versions.
from OpenSSL.crypto import X509
x509 = X509()
if getattr(x509, "_x509", None) is None:
raise ImportError(
"'pyOpenSSL' module missing required functionality. "
"Try upgrading to v0.14 or newer."
)
def _dnsname_to_stdlib(name: str) -> str | None:
"""
Converts a dNSName SubjectAlternativeName field to the form used by the
standard library on the given Python version.
Cryptography produces a dNSName as a unicode string that was idna-decoded
from ASCII bytes. We need to idna-encode that string to get it back, and
then on Python 3 we also need to convert to unicode via UTF-8 (the stdlib
uses PyUnicode_FromStringAndSize on it, which decodes via UTF-8).
If the name cannot be idna-encoded then we return None signalling that
the name given should be skipped.
"""
def idna_encode(name: str) -> bytes | None:
"""
Borrowed wholesale from the Python Cryptography Project. It turns out
that we can't just safely call `idna.encode`: it can explode for
wildcard names. This avoids that problem.
"""
import idna
try:
for prefix in ["*.", "."]:
if name.startswith(prefix):
name = name[len(prefix) :]
return prefix.encode("ascii") + idna.encode(name)
return idna.encode(name)
except idna.core.IDNAError:
return None
# Don't send IPv6 addresses through the IDNA encoder.
if ":" in name:
return name
encoded_name = idna_encode(name)
if encoded_name is None:
return None
return encoded_name.decode("utf-8")
def get_subj_alt_name(peer_cert: X509) -> list[tuple[str, str]]:
"""
Given an PyOpenSSL certificate, provides all the subject alternative names.
"""
cert = peer_cert.to_cryptography()
# We want to find the SAN extension. Ask Cryptography to locate it (it's
# faster than looping in Python)
try:
ext = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName).value
except x509.ExtensionNotFound:
# No such extension, return the empty list.
return []
except (
x509.DuplicateExtension,
UnsupportedExtension,
x509.UnsupportedGeneralNameType,
UnicodeError,
) as e:
# A problem has been found with the quality of the certificate. Assume
# no SAN field is present.
log.warning(
"A problem was encountered with the certificate that prevented "
"urllib3 from finding the SubjectAlternativeName field. This can "
"affect certificate validation. The error was %s",
e,
)
return []
# We want to return dNSName and iPAddress fields. We need to cast the IPs
# back to strings because the match_hostname function wants them as
# strings.
# Sadly the DNS names need to be idna encoded and then, on Python 3, UTF-8
# decoded. This is pretty frustrating, but that's what the standard library
# does with certificates, and so we need to attempt to do the same.
# We also want to skip over names which cannot be idna encoded.
names = [
("DNS", name)
for name in map(_dnsname_to_stdlib, ext.get_values_for_type(x509.DNSName))
if name is not None
]
names.extend(
("IP Address", str(name)) for name in ext.get_values_for_type(x509.IPAddress)
)
return names
class WrappedSocket:
"""API-compatibility wrapper for Python OpenSSL's Connection-class."""
def __init__(
self,
connection: OpenSSL.SSL.Connection,
socket: socket_cls,
suppress_ragged_eofs: bool = True,
) -> None:
self.connection = connection
self.socket = socket
self.suppress_ragged_eofs = suppress_ragged_eofs
self._io_refs = 0
self._closed = False
def fileno(self) -> int:
return self.socket.fileno()
# Copy-pasted from Python 3.5 source code
def _decref_socketios(self) -> None:
if self._io_refs > 0:
self._io_refs -= 1
if self._closed:
self.close()
def recv(self, *args: typing.Any, **kwargs: typing.Any) -> bytes:
try:
data = self.connection.recv(*args, **kwargs)
except OpenSSL.SSL.SysCallError as e:
if self.suppress_ragged_eofs and e.args == (-1, "Unexpected EOF"):
return b""
else:
raise OSError(e.args[0], str(e)) from e
except OpenSSL.SSL.ZeroReturnError:
if self.connection.get_shutdown() == OpenSSL.SSL.RECEIVED_SHUTDOWN:
return b""
else:
raise
except OpenSSL.SSL.WantReadError as e:
if not util.wait_for_read(self.socket, self.socket.gettimeout()):
raise timeout("The read operation timed out") from e
else:
return self.recv(*args, **kwargs)
# TLS 1.3 post-handshake authentication
except OpenSSL.SSL.Error as e:
raise ssl.SSLError(f"read error: {e!r}") from e
else:
return data # type: ignore[no-any-return]
def recv_into(self, *args: typing.Any, **kwargs: typing.Any) -> int:
try:
return self.connection.recv_into(*args, **kwargs) # type: ignore[no-any-return]
except OpenSSL.SSL.SysCallError as e:
if self.suppress_ragged_eofs and e.args == (-1, "Unexpected EOF"):
return 0
else:
raise OSError(e.args[0], str(e)) from e
except OpenSSL.SSL.ZeroReturnError:
if self.connection.get_shutdown() == OpenSSL.SSL.RECEIVED_SHUTDOWN:
return 0
else:
raise
except OpenSSL.SSL.WantReadError as e:
if not util.wait_for_read(self.socket, self.socket.gettimeout()):
raise timeout("The read operation timed out") from e
else:
return self.recv_into(*args, **kwargs)
# TLS 1.3 post-handshake authentication
except OpenSSL.SSL.Error as e:
raise ssl.SSLError(f"read error: {e!r}") from e
def settimeout(self, timeout: float) -> None:
return self.socket.settimeout(timeout)
def _send_until_done(self, data: bytes) -> int:
while True:
try:
return self.connection.send(data) # type: ignore[no-any-return]
except OpenSSL.SSL.WantWriteError as e:
if not util.wait_for_write(self.socket, self.socket.gettimeout()):
raise timeout() from e
continue
except OpenSSL.SSL.SysCallError as e:
raise OSError(e.args[0], str(e)) from e
def sendall(self, data: bytes) -> None:
total_sent = 0
while total_sent < len(data):
sent = self._send_until_done(
data[total_sent : total_sent + SSL_WRITE_BLOCKSIZE]
)
total_sent += sent
def shutdown(self) -> None:
# FIXME rethrow compatible exceptions should we ever use this
self.connection.shutdown()
def close(self) -> None:
self._closed = True
if self._io_refs <= 0:
self._real_close()
def _real_close(self) -> None:
try:
return self.connection.close() # type: ignore[no-any-return]
except OpenSSL.SSL.Error:
return
def getpeercert(
self, binary_form: bool = False
) -> dict[str, list[typing.Any]] | None:
x509 = self.connection.get_peer_certificate()
if not x509:
return x509 # type: ignore[no-any-return]
if binary_form:
return OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_ASN1, x509) # type: ignore[no-any-return]
return {
"subject": ((("commonName", x509.get_subject().CN),),), # type: ignore[dict-item]
"subjectAltName": get_subj_alt_name(x509),
}
def version(self) -> str:
return self.connection.get_protocol_version_name() # type: ignore[no-any-return]
WrappedSocket.makefile = socket_cls.makefile # type: ignore[attr-defined]
class PyOpenSSLContext:
"""
I am a wrapper class for the PyOpenSSL ``Context`` object. I am responsible
for translating the interface of the standard library ``SSLContext`` object
to calls into PyOpenSSL.
"""
def __init__(self, protocol: int) -> None:
self.protocol = _openssl_versions[protocol]
self._ctx = OpenSSL.SSL.Context(self.protocol)
self._options = 0
self.check_hostname = False
self._minimum_version: int = ssl.TLSVersion.MINIMUM_SUPPORTED
self._maximum_version: int = ssl.TLSVersion.MAXIMUM_SUPPORTED
@property
def options(self) -> int:
return self._options
@options.setter
def options(self, value: int) -> None:
self._options = value
self._set_ctx_options()
@property
def verify_mode(self) -> int:
return _openssl_to_stdlib_verify[self._ctx.get_verify_mode()]
@verify_mode.setter
def verify_mode(self, value: ssl.VerifyMode) -> None:
self._ctx.set_verify(_stdlib_to_openssl_verify[value], _verify_callback)
def set_default_verify_paths(self) -> None:
self._ctx.set_default_verify_paths()
def set_ciphers(self, ciphers: bytes | str) -> None:
if isinstance(ciphers, str):
ciphers = ciphers.encode("utf-8")
self._ctx.set_cipher_list(ciphers)
def load_verify_locations(
self,
cafile: str | None = None,
capath: str | None = None,
cadata: bytes | None = None,
) -> None:
if cafile is not None:
cafile = cafile.encode("utf-8") # type: ignore[assignment]
if capath is not None:
capath = capath.encode("utf-8") # type: ignore[assignment]
try:
self._ctx.load_verify_locations(cafile, capath)
if cadata is not None:
self._ctx.load_verify_locations(BytesIO(cadata))
except OpenSSL.SSL.Error as e:
raise ssl.SSLError(f"unable to load trusted certificates: {e!r}") from e
def load_cert_chain(
self,
certfile: str,
keyfile: str | None = None,
password: str | None = None,
) -> None:
try:
self._ctx.use_certificate_chain_file(certfile)
if password is not None:
if not isinstance(password, bytes):
password = password.encode("utf-8") # type: ignore[assignment]
self._ctx.set_passwd_cb(lambda *_: password)
self._ctx.use_privatekey_file(keyfile or certfile)
except OpenSSL.SSL.Error as e:
raise ssl.SSLError(f"Unable to load certificate chain: {e!r}") from e
def set_alpn_protocols(self, protocols: list[bytes | str]) -> None:
protocols = [util.util.to_bytes(p, "ascii") for p in protocols]
return self._ctx.set_alpn_protos(protocols) # type: ignore[no-any-return]
def wrap_socket(
self,
sock: socket_cls,
server_side: bool = False,
do_handshake_on_connect: bool = True,
suppress_ragged_eofs: bool = True,
server_hostname: bytes | str | None = None,
) -> WrappedSocket:
cnx = OpenSSL.SSL.Connection(self._ctx, sock)
# If server_hostname is an IP, don't use it for SNI, per RFC6066 Section 3
if server_hostname and not util.ssl_.is_ipaddress(server_hostname):
if isinstance(server_hostname, str):
server_hostname = server_hostname.encode("utf-8")
cnx.set_tlsext_host_name(server_hostname)
cnx.set_connect_state()
while True:
try:
cnx.do_handshake()
except OpenSSL.SSL.WantReadError as e:
if not util.wait_for_read(sock, sock.gettimeout()):
raise timeout("select timed out") from e
continue
except OpenSSL.SSL.Error as e:
raise ssl.SSLError(f"bad handshake: {e!r}") from e
break
return WrappedSocket(cnx, sock)
def _set_ctx_options(self) -> None:
self._ctx.set_options(
self._options
| _openssl_to_ssl_minimum_version[self._minimum_version]
| _openssl_to_ssl_maximum_version[self._maximum_version]
)
@property
def minimum_version(self) -> int:
return self._minimum_version
@minimum_version.setter
def minimum_version(self, minimum_version: int) -> None:
self._minimum_version = minimum_version
self._set_ctx_options()
@property
def maximum_version(self) -> int:
return self._maximum_version
@maximum_version.setter
def maximum_version(self, maximum_version: int) -> None:
self._maximum_version = maximum_version
self._set_ctx_options()
def _verify_callback(
cnx: OpenSSL.SSL.Connection,
x509: X509,
err_no: int,
err_depth: int,
return_code: int,
) -> bool:
return err_no == 0

View File

@ -0,0 +1,913 @@
"""
SecureTranport support for urllib3 via ctypes.
This makes platform-native TLS available to urllib3 users on macOS without the
use of a compiler. This is an important feature because the Python Package
Index is moving to become a TLSv1.2-or-higher server, and the default OpenSSL
that ships with macOS is not capable of doing TLSv1.2. The only way to resolve
this is to give macOS users an alternative solution to the problem, and that
solution is to use SecureTransport.
We use ctypes here because this solution must not require a compiler. That's
because pip is not allowed to require a compiler either.
This is not intended to be a seriously long-term solution to this problem.
The hope is that PEP 543 will eventually solve this issue for us, at which
point we can retire this contrib module. But in the short term, we need to
solve the impending tire fire that is Python on Mac without this kind of
contrib module. So...here we are.
To use this module, simply import and inject it::
import urllib3.contrib.securetransport
urllib3.contrib.securetransport.inject_into_urllib3()
Happy TLSing!
This code is a bastardised version of the code found in Will Bond's oscrypto
library. An enormous debt is owed to him for blazing this trail for us. For
that reason, this code should be considered to be covered both by urllib3's
license and by oscrypto's:
.. code-block::
Copyright (c) 2015-2016 Will Bond <will@wbond.net>
Permission is hereby granted, free of charge, to any person obtaining a
copy of this software and associated documentation files (the "Software"),
to deal in the Software without restriction, including without limitation
the rights to use, copy, modify, merge, publish, distribute, sublicense,
and/or sell copies of the Software, and to permit persons to whom the
Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
"""
from __future__ import annotations
import contextlib
import ctypes
import errno
import os.path
import shutil
import socket
import ssl
import struct
import threading
import typing
import warnings
import weakref
from socket import socket as socket_cls
from .. import util
from ._securetransport.bindings import ( # type: ignore[attr-defined]
CoreFoundation,
Security,
)
from ._securetransport.low_level import (
SecurityConst,
_assert_no_error,
_build_tls_unknown_ca_alert,
_cert_array_from_pem,
_create_cfstring_array,
_load_client_cert_chain,
_temporary_keychain,
)
warnings.warn(
"'urllib3.contrib.securetransport' module is deprecated and will be removed "
"in urllib3 v2.1.0. Read more in this issue: "
"https://github.com/urllib3/urllib3/issues/2681",
category=DeprecationWarning,
stacklevel=2,
)
if typing.TYPE_CHECKING:
from typing_extensions import Literal
__all__ = ["inject_into_urllib3", "extract_from_urllib3"]
orig_util_SSLContext = util.ssl_.SSLContext
# This dictionary is used by the read callback to obtain a handle to the
# calling wrapped socket. This is a pretty silly approach, but for now it'll
# do. I feel like I should be able to smuggle a handle to the wrapped socket
# directly in the SSLConnectionRef, but for now this approach will work I
# guess.
#
# We need to lock around this structure for inserts, but we don't do it for
# reads/writes in the callbacks. The reasoning here goes as follows:
#
# 1. It is not possible to call into the callbacks before the dictionary is
# populated, so once in the callback the id must be in the dictionary.
# 2. The callbacks don't mutate the dictionary, they only read from it, and
# so cannot conflict with any of the insertions.
#
# This is good: if we had to lock in the callbacks we'd drastically slow down
# the performance of this code.
_connection_refs: weakref.WeakValueDictionary[
int, WrappedSocket
] = weakref.WeakValueDictionary()
_connection_ref_lock = threading.Lock()
# Limit writes to 16kB. This is OpenSSL's limit, but we'll cargo-cult it over
# for no better reason than we need *a* limit, and this one is right there.
SSL_WRITE_BLOCKSIZE = 16384
# Basically this is simple: for PROTOCOL_SSLv23 we turn it into a low of
# TLSv1 and a high of TLSv1.2. For everything else, we pin to that version.
# TLSv1 to 1.2 are supported on macOS 10.8+
_protocol_to_min_max = {
util.ssl_.PROTOCOL_TLS: (SecurityConst.kTLSProtocol1, SecurityConst.kTLSProtocol12), # type: ignore[attr-defined]
util.ssl_.PROTOCOL_TLS_CLIENT: ( # type: ignore[attr-defined]
SecurityConst.kTLSProtocol1,
SecurityConst.kTLSProtocol12,
),
}
if hasattr(ssl, "PROTOCOL_SSLv2"):
_protocol_to_min_max[ssl.PROTOCOL_SSLv2] = (
SecurityConst.kSSLProtocol2,
SecurityConst.kSSLProtocol2,
)
if hasattr(ssl, "PROTOCOL_SSLv3"):
_protocol_to_min_max[ssl.PROTOCOL_SSLv3] = (
SecurityConst.kSSLProtocol3,
SecurityConst.kSSLProtocol3,
)
if hasattr(ssl, "PROTOCOL_TLSv1"):
_protocol_to_min_max[ssl.PROTOCOL_TLSv1] = (
SecurityConst.kTLSProtocol1,
SecurityConst.kTLSProtocol1,
)
if hasattr(ssl, "PROTOCOL_TLSv1_1"):
_protocol_to_min_max[ssl.PROTOCOL_TLSv1_1] = (
SecurityConst.kTLSProtocol11,
SecurityConst.kTLSProtocol11,
)
if hasattr(ssl, "PROTOCOL_TLSv1_2"):
_protocol_to_min_max[ssl.PROTOCOL_TLSv1_2] = (
SecurityConst.kTLSProtocol12,
SecurityConst.kTLSProtocol12,
)
_tls_version_to_st: dict[int, int] = {
ssl.TLSVersion.MINIMUM_SUPPORTED: SecurityConst.kTLSProtocol1,
ssl.TLSVersion.TLSv1: SecurityConst.kTLSProtocol1,
ssl.TLSVersion.TLSv1_1: SecurityConst.kTLSProtocol11,
ssl.TLSVersion.TLSv1_2: SecurityConst.kTLSProtocol12,
ssl.TLSVersion.MAXIMUM_SUPPORTED: SecurityConst.kTLSProtocol12,
}
def inject_into_urllib3() -> None:
"""
Monkey-patch urllib3 with SecureTransport-backed SSL-support.
"""
util.SSLContext = SecureTransportContext # type: ignore[assignment]
util.ssl_.SSLContext = SecureTransportContext # type: ignore[assignment]
util.IS_SECURETRANSPORT = True
util.ssl_.IS_SECURETRANSPORT = True
def extract_from_urllib3() -> None:
"""
Undo monkey-patching by :func:`inject_into_urllib3`.
"""
util.SSLContext = orig_util_SSLContext
util.ssl_.SSLContext = orig_util_SSLContext
util.IS_SECURETRANSPORT = False
util.ssl_.IS_SECURETRANSPORT = False
def _read_callback(
connection_id: int, data_buffer: int, data_length_pointer: bytearray
) -> int:
"""
SecureTransport read callback. This is called by ST to request that data
be returned from the socket.
"""
wrapped_socket = None
try:
wrapped_socket = _connection_refs.get(connection_id)
if wrapped_socket is None:
return SecurityConst.errSSLInternal
base_socket = wrapped_socket.socket
requested_length = data_length_pointer[0]
timeout = wrapped_socket.gettimeout()
error = None
read_count = 0
try:
while read_count < requested_length:
if timeout is None or timeout >= 0:
if not util.wait_for_read(base_socket, timeout):
raise OSError(errno.EAGAIN, "timed out")
remaining = requested_length - read_count
buffer = (ctypes.c_char * remaining).from_address(
data_buffer + read_count
)
chunk_size = base_socket.recv_into(buffer, remaining)
read_count += chunk_size
if not chunk_size:
if not read_count:
return SecurityConst.errSSLClosedGraceful
break
except OSError as e:
error = e.errno
if error is not None and error != errno.EAGAIN:
data_length_pointer[0] = read_count
if error == errno.ECONNRESET or error == errno.EPIPE:
return SecurityConst.errSSLClosedAbort
raise
data_length_pointer[0] = read_count
if read_count != requested_length:
return SecurityConst.errSSLWouldBlock
return 0
except Exception as e:
if wrapped_socket is not None:
wrapped_socket._exception = e
return SecurityConst.errSSLInternal
def _write_callback(
connection_id: int, data_buffer: int, data_length_pointer: bytearray
) -> int:
"""
SecureTransport write callback. This is called by ST to request that data
actually be sent on the network.
"""
wrapped_socket = None
try:
wrapped_socket = _connection_refs.get(connection_id)
if wrapped_socket is None:
return SecurityConst.errSSLInternal
base_socket = wrapped_socket.socket
bytes_to_write = data_length_pointer[0]
data = ctypes.string_at(data_buffer, bytes_to_write)
timeout = wrapped_socket.gettimeout()
error = None
sent = 0
try:
while sent < bytes_to_write:
if timeout is None or timeout >= 0:
if not util.wait_for_write(base_socket, timeout):
raise OSError(errno.EAGAIN, "timed out")
chunk_sent = base_socket.send(data)
sent += chunk_sent
# This has some needless copying here, but I'm not sure there's
# much value in optimising this data path.
data = data[chunk_sent:]
except OSError as e:
error = e.errno
if error is not None and error != errno.EAGAIN:
data_length_pointer[0] = sent
if error == errno.ECONNRESET or error == errno.EPIPE:
return SecurityConst.errSSLClosedAbort
raise
data_length_pointer[0] = sent
if sent != bytes_to_write:
return SecurityConst.errSSLWouldBlock
return 0
except Exception as e:
if wrapped_socket is not None:
wrapped_socket._exception = e
return SecurityConst.errSSLInternal
# We need to keep these two objects references alive: if they get GC'd while
# in use then SecureTransport could attempt to call a function that is in freed
# memory. That would be...uh...bad. Yeah, that's the word. Bad.
_read_callback_pointer = Security.SSLReadFunc(_read_callback)
_write_callback_pointer = Security.SSLWriteFunc(_write_callback)
class WrappedSocket:
"""
API-compatibility wrapper for Python's OpenSSL wrapped socket object.
"""
def __init__(self, socket: socket_cls) -> None:
self.socket = socket
self.context = None
self._io_refs = 0
self._closed = False
self._real_closed = False
self._exception: Exception | None = None
self._keychain = None
self._keychain_dir: str | None = None
self._client_cert_chain = None
# We save off the previously-configured timeout and then set it to
# zero. This is done because we use select and friends to handle the
# timeouts, but if we leave the timeout set on the lower socket then
# Python will "kindly" call select on that socket again for us. Avoid
# that by forcing the timeout to zero.
self._timeout = self.socket.gettimeout()
self.socket.settimeout(0)
@contextlib.contextmanager
def _raise_on_error(self) -> typing.Generator[None, None, None]:
"""
A context manager that can be used to wrap calls that do I/O from
SecureTransport. If any of the I/O callbacks hit an exception, this
context manager will correctly propagate the exception after the fact.
This avoids silently swallowing those exceptions.
It also correctly forces the socket closed.
"""
self._exception = None
# We explicitly don't catch around this yield because in the unlikely
# event that an exception was hit in the block we don't want to swallow
# it.
yield
if self._exception is not None:
exception, self._exception = self._exception, None
self._real_close()
raise exception
def _set_alpn_protocols(self, protocols: list[bytes] | None) -> None:
"""
Sets up the ALPN protocols on the context.
"""
if not protocols:
return
protocols_arr = _create_cfstring_array(protocols)
try:
result = Security.SSLSetALPNProtocols(self.context, protocols_arr)
_assert_no_error(result)
finally:
CoreFoundation.CFRelease(protocols_arr)
def _custom_validate(self, verify: bool, trust_bundle: bytes | None) -> None:
"""
Called when we have set custom validation. We do this in two cases:
first, when cert validation is entirely disabled; and second, when
using a custom trust DB.
Raises an SSLError if the connection is not trusted.
"""
# If we disabled cert validation, just say: cool.
if not verify or trust_bundle is None:
return
successes = (
SecurityConst.kSecTrustResultUnspecified,
SecurityConst.kSecTrustResultProceed,
)
try:
trust_result = self._evaluate_trust(trust_bundle)
if trust_result in successes:
return
reason = f"error code: {int(trust_result)}"
exc = None
except Exception as e:
# Do not trust on error
reason = f"exception: {e!r}"
exc = e
# SecureTransport does not send an alert nor shuts down the connection.
rec = _build_tls_unknown_ca_alert(self.version())
self.socket.sendall(rec)
# close the connection immediately
# l_onoff = 1, activate linger
# l_linger = 0, linger for 0 seoncds
opts = struct.pack("ii", 1, 0)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, opts)
self._real_close()
raise ssl.SSLError(f"certificate verify failed, {reason}") from exc
def _evaluate_trust(self, trust_bundle: bytes) -> int:
# We want data in memory, so load it up.
if os.path.isfile(trust_bundle):
with open(trust_bundle, "rb") as f:
trust_bundle = f.read()
cert_array = None
trust = Security.SecTrustRef()
try:
# Get a CFArray that contains the certs we want.
cert_array = _cert_array_from_pem(trust_bundle)
# Ok, now the hard part. We want to get the SecTrustRef that ST has
# created for this connection, shove our CAs into it, tell ST to
# ignore everything else it knows, and then ask if it can build a
# chain. This is a buuuunch of code.
result = Security.SSLCopyPeerTrust(self.context, ctypes.byref(trust))
_assert_no_error(result)
if not trust:
raise ssl.SSLError("Failed to copy trust reference")
result = Security.SecTrustSetAnchorCertificates(trust, cert_array)
_assert_no_error(result)
result = Security.SecTrustSetAnchorCertificatesOnly(trust, True)
_assert_no_error(result)
trust_result = Security.SecTrustResultType()
result = Security.SecTrustEvaluate(trust, ctypes.byref(trust_result))
_assert_no_error(result)
finally:
if trust:
CoreFoundation.CFRelease(trust)
if cert_array is not None:
CoreFoundation.CFRelease(cert_array)
return trust_result.value # type: ignore[no-any-return]
def handshake(
self,
server_hostname: bytes | str | None,
verify: bool,
trust_bundle: bytes | None,
min_version: int,
max_version: int,
client_cert: str | None,
client_key: str | None,
client_key_passphrase: typing.Any,
alpn_protocols: list[bytes] | None,
) -> None:
"""
Actually performs the TLS handshake. This is run automatically by
wrapped socket, and shouldn't be needed in user code.
"""
# First, we do the initial bits of connection setup. We need to create
# a context, set its I/O funcs, and set the connection reference.
self.context = Security.SSLCreateContext(
None, SecurityConst.kSSLClientSide, SecurityConst.kSSLStreamType
)
result = Security.SSLSetIOFuncs(
self.context, _read_callback_pointer, _write_callback_pointer
)
_assert_no_error(result)
# Here we need to compute the handle to use. We do this by taking the
# id of self modulo 2**31 - 1. If this is already in the dictionary, we
# just keep incrementing by one until we find a free space.
with _connection_ref_lock:
handle = id(self) % 2147483647
while handle in _connection_refs:
handle = (handle + 1) % 2147483647
_connection_refs[handle] = self
result = Security.SSLSetConnection(self.context, handle)
_assert_no_error(result)
# If we have a server hostname, we should set that too.
# RFC6066 Section 3 tells us not to use SNI when the host is an IP, but we have
# to do it anyway to match server_hostname against the server certificate
if server_hostname:
if not isinstance(server_hostname, bytes):
server_hostname = server_hostname.encode("utf-8")
result = Security.SSLSetPeerDomainName(
self.context, server_hostname, len(server_hostname)
)
_assert_no_error(result)
# Setup the ALPN protocols.
self._set_alpn_protocols(alpn_protocols)
# Set the minimum and maximum TLS versions.
result = Security.SSLSetProtocolVersionMin(self.context, min_version)
_assert_no_error(result)
result = Security.SSLSetProtocolVersionMax(self.context, max_version)
_assert_no_error(result)
# If there's a trust DB, we need to use it. We do that by telling
# SecureTransport to break on server auth. We also do that if we don't
# want to validate the certs at all: we just won't actually do any
# authing in that case.
if not verify or trust_bundle is not None:
result = Security.SSLSetSessionOption(
self.context, SecurityConst.kSSLSessionOptionBreakOnServerAuth, True
)
_assert_no_error(result)
# If there's a client cert, we need to use it.
if client_cert:
self._keychain, self._keychain_dir = _temporary_keychain()
self._client_cert_chain = _load_client_cert_chain(
self._keychain, client_cert, client_key
)
result = Security.SSLSetCertificate(self.context, self._client_cert_chain)
_assert_no_error(result)
while True:
with self._raise_on_error():
result = Security.SSLHandshake(self.context)
if result == SecurityConst.errSSLWouldBlock:
raise socket.timeout("handshake timed out")
elif result == SecurityConst.errSSLServerAuthCompleted:
self._custom_validate(verify, trust_bundle)
continue
else:
_assert_no_error(result)
break
def fileno(self) -> int:
return self.socket.fileno()
# Copy-pasted from Python 3.5 source code
def _decref_socketios(self) -> None:
if self._io_refs > 0:
self._io_refs -= 1
if self._closed:
self.close()
def recv(self, bufsiz: int) -> bytes:
buffer = ctypes.create_string_buffer(bufsiz)
bytes_read = self.recv_into(buffer, bufsiz)
data = buffer[:bytes_read]
return typing.cast(bytes, data)
def recv_into(
self, buffer: ctypes.Array[ctypes.c_char], nbytes: int | None = None
) -> int:
# Read short on EOF.
if self._real_closed:
return 0
if nbytes is None:
nbytes = len(buffer)
buffer = (ctypes.c_char * nbytes).from_buffer(buffer)
processed_bytes = ctypes.c_size_t(0)
with self._raise_on_error():
result = Security.SSLRead(
self.context, buffer, nbytes, ctypes.byref(processed_bytes)
)
# There are some result codes that we want to treat as "not always
# errors". Specifically, those are errSSLWouldBlock,
# errSSLClosedGraceful, and errSSLClosedNoNotify.
if result == SecurityConst.errSSLWouldBlock:
# If we didn't process any bytes, then this was just a time out.
# However, we can get errSSLWouldBlock in situations when we *did*
# read some data, and in those cases we should just read "short"
# and return.
if processed_bytes.value == 0:
# Timed out, no data read.
raise socket.timeout("recv timed out")
elif result in (
SecurityConst.errSSLClosedGraceful,
SecurityConst.errSSLClosedNoNotify,
):
# The remote peer has closed this connection. We should do so as
# well. Note that we don't actually return here because in
# principle this could actually be fired along with return data.
# It's unlikely though.
self._real_close()
else:
_assert_no_error(result)
# Ok, we read and probably succeeded. We should return whatever data
# was actually read.
return processed_bytes.value
def settimeout(self, timeout: float) -> None:
self._timeout = timeout
def gettimeout(self) -> float | None:
return self._timeout
def send(self, data: bytes) -> int:
processed_bytes = ctypes.c_size_t(0)
with self._raise_on_error():
result = Security.SSLWrite(
self.context, data, len(data), ctypes.byref(processed_bytes)
)
if result == SecurityConst.errSSLWouldBlock and processed_bytes.value == 0:
# Timed out
raise socket.timeout("send timed out")
else:
_assert_no_error(result)
# We sent, and probably succeeded. Tell them how much we sent.
return processed_bytes.value
def sendall(self, data: bytes) -> None:
total_sent = 0
while total_sent < len(data):
sent = self.send(data[total_sent : total_sent + SSL_WRITE_BLOCKSIZE])
total_sent += sent
def shutdown(self) -> None:
with self._raise_on_error():
Security.SSLClose(self.context)
def close(self) -> None:
self._closed = True
# TODO: should I do clean shutdown here? Do I have to?
if self._io_refs <= 0:
self._real_close()
def _real_close(self) -> None:
self._real_closed = True
if self.context:
CoreFoundation.CFRelease(self.context)
self.context = None
if self._client_cert_chain:
CoreFoundation.CFRelease(self._client_cert_chain)
self._client_cert_chain = None
if self._keychain:
Security.SecKeychainDelete(self._keychain)
CoreFoundation.CFRelease(self._keychain)
shutil.rmtree(self._keychain_dir)
self._keychain = self._keychain_dir = None
return self.socket.close()
def getpeercert(self, binary_form: bool = False) -> bytes | None:
# Urgh, annoying.
#
# Here's how we do this:
#
# 1. Call SSLCopyPeerTrust to get hold of the trust object for this
# connection.
# 2. Call SecTrustGetCertificateAtIndex for index 0 to get the leaf.
# 3. To get the CN, call SecCertificateCopyCommonName and process that
# string so that it's of the appropriate type.
# 4. To get the SAN, we need to do something a bit more complex:
# a. Call SecCertificateCopyValues to get the data, requesting
# kSecOIDSubjectAltName.
# b. Mess about with this dictionary to try to get the SANs out.
#
# This is gross. Really gross. It's going to be a few hundred LoC extra
# just to repeat something that SecureTransport can *already do*. So my
# operating assumption at this time is that what we want to do is
# instead to just flag to urllib3 that it shouldn't do its own hostname
# validation when using SecureTransport.
if not binary_form:
raise ValueError("SecureTransport only supports dumping binary certs")
trust = Security.SecTrustRef()
certdata = None
der_bytes = None
try:
# Grab the trust store.
result = Security.SSLCopyPeerTrust(self.context, ctypes.byref(trust))
_assert_no_error(result)
if not trust:
# Probably we haven't done the handshake yet. No biggie.
return None
cert_count = Security.SecTrustGetCertificateCount(trust)
if not cert_count:
# Also a case that might happen if we haven't handshaked.
# Handshook? Handshaken?
return None
leaf = Security.SecTrustGetCertificateAtIndex(trust, 0)
assert leaf
# Ok, now we want the DER bytes.
certdata = Security.SecCertificateCopyData(leaf)
assert certdata
data_length = CoreFoundation.CFDataGetLength(certdata)
data_buffer = CoreFoundation.CFDataGetBytePtr(certdata)
der_bytes = ctypes.string_at(data_buffer, data_length)
finally:
if certdata:
CoreFoundation.CFRelease(certdata)
if trust:
CoreFoundation.CFRelease(trust)
return der_bytes
def version(self) -> str:
protocol = Security.SSLProtocol()
result = Security.SSLGetNegotiatedProtocolVersion(
self.context, ctypes.byref(protocol)
)
_assert_no_error(result)
if protocol.value == SecurityConst.kTLSProtocol13:
raise ssl.SSLError("SecureTransport does not support TLS 1.3")
elif protocol.value == SecurityConst.kTLSProtocol12:
return "TLSv1.2"
elif protocol.value == SecurityConst.kTLSProtocol11:
return "TLSv1.1"
elif protocol.value == SecurityConst.kTLSProtocol1:
return "TLSv1"
elif protocol.value == SecurityConst.kSSLProtocol3:
return "SSLv3"
elif protocol.value == SecurityConst.kSSLProtocol2:
return "SSLv2"
else:
raise ssl.SSLError(f"Unknown TLS version: {protocol!r}")
def makefile(
self: socket_cls,
mode: (
Literal["r"] | Literal["w"] | Literal["rw"] | Literal["wr"] | Literal[""]
) = "r",
buffering: int | None = None,
*args: typing.Any,
**kwargs: typing.Any,
) -> typing.BinaryIO | typing.TextIO:
# We disable buffering with SecureTransport because it conflicts with
# the buffering that ST does internally (see issue #1153 for more).
buffering = 0
return socket_cls.makefile(self, mode, buffering, *args, **kwargs)
WrappedSocket.makefile = makefile # type: ignore[attr-defined]
class SecureTransportContext:
"""
I am a wrapper class for the SecureTransport library, to translate the
interface of the standard library ``SSLContext`` object to calls into
SecureTransport.
"""
def __init__(self, protocol: int) -> None:
self._minimum_version: int = ssl.TLSVersion.MINIMUM_SUPPORTED
self._maximum_version: int = ssl.TLSVersion.MAXIMUM_SUPPORTED
if protocol not in (None, ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS_CLIENT):
self._min_version, self._max_version = _protocol_to_min_max[protocol]
self._options = 0
self._verify = False
self._trust_bundle: bytes | None = None
self._client_cert: str | None = None
self._client_key: str | None = None
self._client_key_passphrase = None
self._alpn_protocols: list[bytes] | None = None
@property
def check_hostname(self) -> Literal[True]:
"""
SecureTransport cannot have its hostname checking disabled. For more,
see the comment on getpeercert() in this file.
"""
return True
@check_hostname.setter
def check_hostname(self, value: typing.Any) -> None:
"""
SecureTransport cannot have its hostname checking disabled. For more,
see the comment on getpeercert() in this file.
"""
@property
def options(self) -> int:
# TODO: Well, crap.
#
# So this is the bit of the code that is the most likely to cause us
# trouble. Essentially we need to enumerate all of the SSL options that
# users might want to use and try to see if we can sensibly translate
# them, or whether we should just ignore them.
return self._options
@options.setter
def options(self, value: int) -> None:
# TODO: Update in line with above.
self._options = value
@property
def verify_mode(self) -> int:
return ssl.CERT_REQUIRED if self._verify else ssl.CERT_NONE
@verify_mode.setter
def verify_mode(self, value: int) -> None:
self._verify = value == ssl.CERT_REQUIRED
def set_default_verify_paths(self) -> None:
# So, this has to do something a bit weird. Specifically, what it does
# is nothing.
#
# This means that, if we had previously had load_verify_locations
# called, this does not undo that. We need to do that because it turns
# out that the rest of the urllib3 code will attempt to load the
# default verify paths if it hasn't been told about any paths, even if
# the context itself was sometime earlier. We resolve that by just
# ignoring it.
pass
def load_default_certs(self) -> None:
return self.set_default_verify_paths()
def set_ciphers(self, ciphers: typing.Any) -> None:
raise ValueError("SecureTransport doesn't support custom cipher strings")
def load_verify_locations(
self,
cafile: str | None = None,
capath: str | None = None,
cadata: bytes | None = None,
) -> None:
# OK, we only really support cadata and cafile.
if capath is not None:
raise ValueError("SecureTransport does not support cert directories")
# Raise if cafile does not exist.
if cafile is not None:
with open(cafile):
pass
self._trust_bundle = cafile or cadata # type: ignore[assignment]
def load_cert_chain(
self,
certfile: str,
keyfile: str | None = None,
password: str | None = None,
) -> None:
self._client_cert = certfile
self._client_key = keyfile
self._client_cert_passphrase = password
def set_alpn_protocols(self, protocols: list[str | bytes]) -> None:
"""
Sets the ALPN protocols that will later be set on the context.
Raises a NotImplementedError if ALPN is not supported.
"""
if not hasattr(Security, "SSLSetALPNProtocols"):
raise NotImplementedError(
"SecureTransport supports ALPN only in macOS 10.12+"
)
self._alpn_protocols = [util.util.to_bytes(p, "ascii") for p in protocols]
def wrap_socket(
self,
sock: socket_cls,
server_side: bool = False,
do_handshake_on_connect: bool = True,
suppress_ragged_eofs: bool = True,
server_hostname: bytes | str | None = None,
) -> WrappedSocket:
# So, what do we do here? Firstly, we assert some properties. This is a
# stripped down shim, so there is some functionality we don't support.
# See PEP 543 for the real deal.
assert not server_side
assert do_handshake_on_connect
assert suppress_ragged_eofs
# Ok, we're good to go. Now we want to create the wrapped socket object
# and store it in the appropriate place.
wrapped_socket = WrappedSocket(sock)
# Now we can handshake
wrapped_socket.handshake(
server_hostname,
self._verify,
self._trust_bundle,
_tls_version_to_st[self._minimum_version],
_tls_version_to_st[self._maximum_version],
self._client_cert,
self._client_key,
self._client_key_passphrase,
self._alpn_protocols,
)
return wrapped_socket
@property
def minimum_version(self) -> int:
return self._minimum_version
@minimum_version.setter
def minimum_version(self, minimum_version: int) -> None:
self._minimum_version = minimum_version
@property
def maximum_version(self) -> int:
return self._maximum_version
@maximum_version.setter
def maximum_version(self, maximum_version: int) -> None:
self._maximum_version = maximum_version

View File

@ -0,0 +1,233 @@
"""
This module contains provisional support for SOCKS proxies from within
urllib3. This module supports SOCKS4, SOCKS4A (an extension of SOCKS4), and
SOCKS5. To enable its functionality, either install PySocks or install this
module with the ``socks`` extra.
The SOCKS implementation supports the full range of urllib3 features. It also
supports the following SOCKS features:
- SOCKS4A (``proxy_url='socks4a://...``)
- SOCKS4 (``proxy_url='socks4://...``)
- SOCKS5 with remote DNS (``proxy_url='socks5h://...``)
- SOCKS5 with local DNS (``proxy_url='socks5://...``)
- Usernames and passwords for the SOCKS proxy
.. note::
It is recommended to use ``socks5h://`` or ``socks4a://`` schemes in
your ``proxy_url`` to ensure that DNS resolution is done from the remote
server instead of client-side when connecting to a domain name.
SOCKS4 supports IPv4 and domain names with the SOCKS4A extension. SOCKS5
supports IPv4, IPv6, and domain names.
When connecting to a SOCKS4 proxy the ``username`` portion of the ``proxy_url``
will be sent as the ``userid`` section of the SOCKS request:
.. code-block:: python
proxy_url="socks4a://<userid>@proxy-host"
When connecting to a SOCKS5 proxy the ``username`` and ``password`` portion
of the ``proxy_url`` will be sent as the username/password to authenticate
with the proxy:
.. code-block:: python
proxy_url="socks5h://<username>:<password>@proxy-host"
"""
from __future__ import annotations
try:
import socks # type: ignore[import]
except ImportError:
import warnings
from ..exceptions import DependencyWarning
warnings.warn(
(
"SOCKS support in urllib3 requires the installation of optional "
"dependencies: specifically, PySocks. For more information, see "
"https://urllib3.readthedocs.io/en/latest/contrib.html#socks-proxies"
),
DependencyWarning,
)
raise
import typing
from socket import timeout as SocketTimeout
from ..connection import HTTPConnection, HTTPSConnection
from ..connectionpool import HTTPConnectionPool, HTTPSConnectionPool
from ..exceptions import ConnectTimeoutError, NewConnectionError
from ..poolmanager import PoolManager
from ..util.url import parse_url
try:
import ssl
except ImportError:
ssl = None # type: ignore[assignment]
try:
from typing import TypedDict
class _TYPE_SOCKS_OPTIONS(TypedDict):
socks_version: int
proxy_host: str | None
proxy_port: str | None
username: str | None
password: str | None
rdns: bool
except ImportError: # Python 3.7
_TYPE_SOCKS_OPTIONS = typing.Dict[str, typing.Any] # type: ignore[misc, assignment]
class SOCKSConnection(HTTPConnection):
"""
A plain-text HTTP connection that connects via a SOCKS proxy.
"""
def __init__(
self,
_socks_options: _TYPE_SOCKS_OPTIONS,
*args: typing.Any,
**kwargs: typing.Any,
) -> None:
self._socks_options = _socks_options
super().__init__(*args, **kwargs)
def _new_conn(self) -> socks.socksocket:
"""
Establish a new connection via the SOCKS proxy.
"""
extra_kw: dict[str, typing.Any] = {}
if self.source_address:
extra_kw["source_address"] = self.source_address
if self.socket_options:
extra_kw["socket_options"] = self.socket_options
try:
conn = socks.create_connection(
(self.host, self.port),
proxy_type=self._socks_options["socks_version"],
proxy_addr=self._socks_options["proxy_host"],
proxy_port=self._socks_options["proxy_port"],
proxy_username=self._socks_options["username"],
proxy_password=self._socks_options["password"],
proxy_rdns=self._socks_options["rdns"],
timeout=self.timeout,
**extra_kw,
)
except SocketTimeout as e:
raise ConnectTimeoutError(
self,
f"Connection to {self.host} timed out. (connect timeout={self.timeout})",
) from e
except socks.ProxyError as e:
# This is fragile as hell, but it seems to be the only way to raise
# useful errors here.
if e.socket_err:
error = e.socket_err
if isinstance(error, SocketTimeout):
raise ConnectTimeoutError(
self,
f"Connection to {self.host} timed out. (connect timeout={self.timeout})",
) from e
else:
# Adding `from e` messes with coverage somehow, so it's omitted.
# See #2386.
raise NewConnectionError(
self, f"Failed to establish a new connection: {error}"
)
else:
raise NewConnectionError(
self, f"Failed to establish a new connection: {e}"
) from e
except OSError as e: # Defensive: PySocks should catch all these.
raise NewConnectionError(
self, f"Failed to establish a new connection: {e}"
) from e
return conn
# We don't need to duplicate the Verified/Unverified distinction from
# urllib3/connection.py here because the HTTPSConnection will already have been
# correctly set to either the Verified or Unverified form by that module. This
# means the SOCKSHTTPSConnection will automatically be the correct type.
class SOCKSHTTPSConnection(SOCKSConnection, HTTPSConnection):
pass
class SOCKSHTTPConnectionPool(HTTPConnectionPool):
ConnectionCls = SOCKSConnection
class SOCKSHTTPSConnectionPool(HTTPSConnectionPool):
ConnectionCls = SOCKSHTTPSConnection
class SOCKSProxyManager(PoolManager):
"""
A version of the urllib3 ProxyManager that routes connections via the
defined SOCKS proxy.
"""
pool_classes_by_scheme = {
"http": SOCKSHTTPConnectionPool,
"https": SOCKSHTTPSConnectionPool,
}
def __init__(
self,
proxy_url: str,
username: str | None = None,
password: str | None = None,
num_pools: int = 10,
headers: typing.Mapping[str, str] | None = None,
**connection_pool_kw: typing.Any,
):
parsed = parse_url(proxy_url)
if username is None and password is None and parsed.auth is not None:
split = parsed.auth.split(":")
if len(split) == 2:
username, password = split
if parsed.scheme == "socks5":
socks_version = socks.PROXY_TYPE_SOCKS5
rdns = False
elif parsed.scheme == "socks5h":
socks_version = socks.PROXY_TYPE_SOCKS5
rdns = True
elif parsed.scheme == "socks4":
socks_version = socks.PROXY_TYPE_SOCKS4
rdns = False
elif parsed.scheme == "socks4a":
socks_version = socks.PROXY_TYPE_SOCKS4
rdns = True
else:
raise ValueError(f"Unable to determine SOCKS version from {proxy_url}")
self.proxy_url = proxy_url
socks_options = {
"socks_version": socks_version,
"proxy_host": parsed.host,
"proxy_port": parsed.port,
"username": username,
"password": password,
"rdns": rdns,
}
connection_pool_kw["_socks_options"] = socks_options
super().__init__(num_pools, headers, **connection_pool_kw)
self.pool_classes_by_scheme = SOCKSProxyManager.pool_classes_by_scheme