Skip to content

Instantly share code, notes, and snippets.

@korrosivesec
Last active March 26, 2024 07:45
Show Gist options
  • Save korrosivesec/0311282d327c58c7c8aa7004ffcab1ba to your computer and use it in GitHub Desktop.
Save korrosivesec/0311282d327c58c7c8aa7004ffcab1ba to your computer and use it in GitHub Desktop.
Extract and parse untrusted self-signed certificate from aiohttp ClientResponse object.
import aiohttp
import asyncio
from typing import Text, Awaitable, Tuple
from cryptography import x509
from cryptography.x509 import Certificate
from cryptography.hazmat.primitives import hashes
# Additional fields to parse documented at https://cryptography.io/en/latest/x509/reference/#x-509-certificate-object
def parse_cert(cert: Certificate) -> dict:
cert_dict = {}
cert_dict['fingerprint'] = ':'.join('{:02X}'.format(byte) for byte in cert.fingerprint(hashes.SHA1()))
cert_dict['serial'] = str(cert.serial_number)
cert_dict['start_date'] = cert.not_valid_before.strftime('%Y/%m/%d')
cert_dict['end_date'] = cert.not_valid_after.strftime('%Y/%m/%d')
cert_dict['issuer'] = cert.issuer.rfc4514_string()
cert_dict['subject'] = cert.subject.rfc4514_string()
return cert_dict
async def make_request(session: aiohttp.ClientSession, url: Text) -> Tuple[Awaitable[aiohttp.ClientResponse], dict]:
# ssl = False so that we can connect to servers with self-signed certificates.
async with session.get(url, ssl=False) as response:
# Get SSLSocket object inside the context manager before the response
# object is closed.
socket = response.connection.transport.get_extra_info('ssl_object')
# getpeercert(True) to retrieve the binary version of the cert.
# Without specifiying binary output, the certificate is never parsed
# when certificate validation is disabled.
# https://github.com/python/cpython/blob/401272e6e660445d6556d5cd4db88ed4267a50b3/Modules/_ssl.c#L1819
cert_binary = socket.getpeercert(True)
cert_x509 = x509.load_der_x509_certificate(cert_binary)
cert_dict = parse_cert(cert_x509)
return response, certificate
async def main():
async with aiohttp.ClientSession() as session:
response, certificate = await make_request(session, 'https://google.com')
print(certificate)
print(response)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment