Skip to content

Commit d725654

Browse files
authored
add fuzzer for signing and verifying with cert chain (#527)
Signed-off-by: Adam Korczynski <[email protected]>
1 parent 34d7470 commit d725654

File tree

1 file changed

+375
-0
lines changed

1 file changed

+375
-0
lines changed
Lines changed: 375 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,375 @@
1+
# Copyright 2025 The Sigstore Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from __future__ import annotations
16+
17+
import datetime as dt
18+
from pathlib import Path
19+
import shutil
20+
import sys
21+
import tempfile
22+
23+
import atheris # type: ignore
24+
from cryptography import x509
25+
from cryptography.exceptions import UnsupportedAlgorithm
26+
from cryptography.hazmat.primitives import hashes
27+
from cryptography.hazmat.primitives import serialization
28+
from cryptography.hazmat.primitives.asymmetric import ec
29+
from cryptography.hazmat.primitives.asymmetric import rsa
30+
from cryptography.x509.oid import ExtendedKeyUsageOID
31+
from cryptography.x509.oid import NameOID
32+
from fuzz_utils import create_fuzz_files
33+
34+
from model_signing import hashing
35+
from model_signing import signing
36+
from model_signing import verifying
37+
38+
39+
def _rand_utf8(
40+
fdp: atheris.FuzzedDataProvider, min_len: int = 1, max_len: int = 32
41+
) -> str:
42+
n = fdp.ConsumeIntInRange(min_len, max_len)
43+
data = fdp.ConsumeBytes(n)
44+
if not data:
45+
return "x"
46+
s = "".join(chr(32 + (c % 95)) for c in data).strip()
47+
return s or "x"
48+
49+
50+
def gen_private_key(fdp: atheris.FuzzedDataProvider):
51+
"""Generate RSA or EC private key using fuzz data (for CAs)."""
52+
if fdp.ConsumeBool():
53+
curve = fdp.PickValueInList(
54+
[ec.SECP256R1(), ec.SECP384R1(), ec.SECP521R1()]
55+
)
56+
return ec.generate_private_key(curve)
57+
key_size = fdp.PickValueInList([1024, 2048])
58+
return rsa.generate_private_key(public_exponent=65537, key_size=key_size)
59+
60+
61+
def gen_ec_key(fdp: atheris.FuzzedDataProvider):
62+
"""Generate an EC private key (for the leaf)."""
63+
curve = fdp.PickValueInList(
64+
[ec.SECP256R1(), ec.SECP384R1(), ec.SECP521R1()]
65+
)
66+
return ec.generate_private_key(curve)
67+
68+
69+
def gen_name(fdp: atheris.FuzzedDataProvider) -> x509.Name:
70+
attrs = [x509.NameAttribute(NameOID.COMMON_NAME, _rand_utf8(fdp, 3, 40))]
71+
if fdp.ConsumeBool():
72+
attrs.append(
73+
x509.NameAttribute(
74+
NameOID.ORGANIZATION_NAME, _rand_utf8(fdp, 2, 20)
75+
)
76+
)
77+
if fdp.ConsumeBool():
78+
attrs.append(
79+
x509.NameAttribute(
80+
NameOID.ORGANIZATIONAL_UNIT_NAME, _rand_utf8(fdp, 2, 20)
81+
)
82+
)
83+
if fdp.ConsumeBool():
84+
cb = fdp.ConsumeBytes(2) or b"US"
85+
country = "".join(chr(ord("A") + (b % 26)) for b in cb)
86+
attrs.append(x509.NameAttribute(NameOID.COUNTRY_NAME, country))
87+
if fdp.ConsumeBool():
88+
attrs.append(
89+
x509.NameAttribute(NameOID.LOCALITY_NAME, _rand_utf8(fdp, 2, 20))
90+
)
91+
return x509.Name(attrs)
92+
93+
94+
def _ski(public_key) -> x509.SubjectKeyIdentifier:
95+
return x509.SubjectKeyIdentifier.from_public_key(public_key)
96+
97+
98+
def deterministic_serial(fdp: atheris.FuzzedDataProvider) -> int:
99+
"""Deterministic, positive serial (≤159 bits, non-zero) from input."""
100+
length = fdp.ConsumeIntInRange(1, 20)
101+
b = fdp.ConsumeBytes(length)
102+
if not b:
103+
b = b"\x01"
104+
val = int.from_bytes(b, "big") & ((1 << 159) - 1)
105+
return val or 1
106+
107+
108+
def deterministic_validity(
109+
fdp: atheris.FuzzedDataProvider,
110+
) -> tuple[dt.datetime, dt.datetime]:
111+
"""Validity window derived solely from fuzz input (no wall clock)."""
112+
base = dt.datetime(2000, 1, 1, tzinfo=dt.timezone.utc)
113+
start_days = fdp.ConsumeIntInRange(0, 9000)
114+
not_before = base + dt.timedelta(days=start_days)
115+
lifetime_days = fdp.ConsumeIntInRange(30, 3650)
116+
not_after = not_before + dt.timedelta(days=lifetime_days)
117+
return not_before, not_after
118+
119+
120+
def _pick_sig_hash(fdp: atheris.FuzzedDataProvider):
121+
return fdp.PickValueInList(
122+
[hashes.SHA256(), hashes.SHA384(), hashes.SHA512()]
123+
)
124+
125+
126+
def build_valid_chain(
127+
fdp: atheris.FuzzedDataProvider,
128+
) -> tuple[x509.Certificate, object, list[x509.Certificate]]:
129+
"""Build a valid chain: root -> 0..3 intermediates -> leaf (depth 1..5).
130+
131+
Returns (leaf_cert, leaf_key, issuers_chain) where issuers_chain is
132+
[nearest_intermediate, ..., root] and does NOT include the leaf.
133+
"""
134+
depth = fdp.ConsumeIntInRange(1, 5)
135+
not_before, not_after = deterministic_validity(fdp)
136+
137+
# Root CA
138+
root_key = gen_private_key(fdp)
139+
root_name = gen_name(fdp)
140+
root_builder = (
141+
x509.CertificateBuilder()
142+
.subject_name(root_name)
143+
.issuer_name(root_name)
144+
.public_key(root_key.public_key())
145+
.serial_number(deterministic_serial(fdp))
146+
.not_valid_before(not_before)
147+
.not_valid_after(not_after)
148+
.add_extension(
149+
x509.BasicConstraints(ca=True, path_length=depth - 1), critical=True
150+
)
151+
.add_extension(
152+
x509.KeyUsage(
153+
digital_signature=False,
154+
content_commitment=False,
155+
key_encipherment=False,
156+
data_encipherment=False,
157+
key_agreement=False,
158+
key_cert_sign=True,
159+
crl_sign=True,
160+
encipher_only=False,
161+
decipher_only=False,
162+
),
163+
critical=True,
164+
)
165+
.add_extension(_ski(root_key.public_key()), critical=False)
166+
)
167+
root_cert = root_builder.sign(
168+
private_key=root_key, algorithm=_pick_sig_hash(fdp)
169+
)
170+
171+
issuer_key = root_key
172+
issuer_cert = root_cert
173+
issuers: list[x509.Certificate] = [root_cert]
174+
175+
# Intermediates
176+
for i in range(depth - 1):
177+
key = gen_private_key(fdp)
178+
name = gen_name(fdp)
179+
inter_builder = (
180+
x509.CertificateBuilder()
181+
.subject_name(name)
182+
.issuer_name(issuer_cert.subject)
183+
.public_key(key.public_key())
184+
.serial_number(deterministic_serial(fdp))
185+
.not_valid_before(not_before)
186+
.not_valid_after(not_after)
187+
.add_extension(
188+
x509.BasicConstraints(ca=True, path_length=(depth - 2 - i)),
189+
critical=True,
190+
)
191+
.add_extension(
192+
x509.KeyUsage(
193+
digital_signature=False,
194+
content_commitment=False,
195+
key_encipherment=False,
196+
data_encipherment=False,
197+
key_agreement=False,
198+
key_cert_sign=True,
199+
crl_sign=True,
200+
encipher_only=False,
201+
decipher_only=False,
202+
),
203+
critical=True,
204+
)
205+
.add_extension(_ski(key.public_key()), critical=False)
206+
.add_extension(
207+
x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier(
208+
x509.SubjectKeyIdentifier.from_public_key(
209+
issuer_key.public_key()
210+
)
211+
),
212+
critical=False,
213+
)
214+
)
215+
inter_cert = inter_builder.sign(
216+
private_key=issuer_key, algorithm=_pick_sig_hash(fdp)
217+
)
218+
issuer_key = key
219+
issuer_cert = inter_cert
220+
issuers.insert(0, inter_cert) # nearest first
221+
222+
# Leaf (code signing) — ALWAYS EC to satisfy signer expectations
223+
leaf_key = gen_ec_key(fdp)
224+
leaf_name = gen_name(fdp)
225+
leaf_builder = (
226+
x509.CertificateBuilder()
227+
.subject_name(leaf_name)
228+
.issuer_name(issuer_cert.subject)
229+
.public_key(leaf_key.public_key())
230+
.serial_number(deterministic_serial(fdp))
231+
.not_valid_before(not_before)
232+
.not_valid_after(not_after)
233+
.add_extension(
234+
x509.BasicConstraints(ca=False, path_length=None), critical=True
235+
)
236+
.add_extension(
237+
x509.KeyUsage(
238+
digital_signature=True,
239+
content_commitment=True,
240+
key_encipherment=False,
241+
data_encipherment=False,
242+
key_agreement=False,
243+
key_cert_sign=False,
244+
crl_sign=False,
245+
encipher_only=False,
246+
decipher_only=False,
247+
),
248+
critical=True,
249+
)
250+
.add_extension(
251+
x509.ExtendedKeyUsage([ExtendedKeyUsageOID.CODE_SIGNING]),
252+
critical=False,
253+
)
254+
.add_extension(_ski(leaf_key.public_key()), critical=False)
255+
.add_extension(
256+
x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier(
257+
x509.SubjectKeyIdentifier.from_public_key(
258+
issuer_key.public_key()
259+
)
260+
),
261+
critical=False,
262+
)
263+
)
264+
leaf_cert = leaf_builder.sign(
265+
private_key=issuer_key, algorithm=_pick_sig_hash(fdp)
266+
)
267+
268+
return leaf_cert, leaf_key, issuers
269+
270+
271+
def to_pem_cert(cert: x509.Certificate) -> bytes:
272+
return cert.public_bytes(encoding=serialization.Encoding.PEM)
273+
274+
275+
def key_to_pem(priv: rsa.RSAPrivateKey | ec.EllipticCurvePrivateKey) -> bytes:
276+
return priv.private_bytes(
277+
encoding=serialization.Encoding.PEM,
278+
format=serialization.PrivateFormat.PKCS8,
279+
encryption_algorithm=serialization.NoEncryption(),
280+
)
281+
282+
283+
def TestOneInput(data: bytes):
284+
fdp = atheris.FuzzedDataProvider(data)
285+
286+
# 1) Build certs & keys (catch x509 construction errors)
287+
try:
288+
leaf_cert, leaf_key, issuers = build_valid_chain(fdp)
289+
except (
290+
ValueError,
291+
TypeError,
292+
x509.DuplicateExtension,
293+
x509.UnsupportedGeneralNameType,
294+
UnsupportedAlgorithm,
295+
):
296+
return # skip this testcase; invalid X.509
297+
298+
# 2) Convert to PEM and write to disk (catch serialization errors)
299+
workdir = tempfile.mkdtemp(prefix="fuzz_cert_")
300+
try:
301+
leaf_key_path = Path(workdir) / "leaf-key.pem"
302+
leaf_cert_path = Path(workdir) / "leaf-cert.pem"
303+
chain_paths: list[Path] = []
304+
305+
with open(leaf_key_path, "wb") as f:
306+
f.write(key_to_pem(leaf_key))
307+
with open(leaf_cert_path, "wb") as f:
308+
f.write(to_pem_cert(leaf_cert))
309+
310+
for idx, cert in enumerate(issuers):
311+
p = Path(workdir) / f"chain-{idx}.pem"
312+
with open(p, "wb") as f:
313+
f.write(to_pem_cert(cert))
314+
chain_paths.append(p)
315+
316+
# Check chain length before signing and verifying
317+
if len(chain_paths) <= 1:
318+
shutil.rmtree(workdir, ignore_errors=True)
319+
return
320+
321+
except (ValueError, TypeError, UnsupportedAlgorithm):
322+
shutil.rmtree(workdir, ignore_errors=True)
323+
return
324+
325+
# 3) Create model files
326+
model_path_dir = tempfile.mkdtemp(prefix="fuzz_model_")
327+
model_path_p = Path(model_path_dir)
328+
created_files = create_fuzz_files(model_path_p, fdp)
329+
if created_files == 0:
330+
return
331+
332+
# Signature output path (we ignore this when signing and verifying)
333+
fname = f"signature-{_rand_utf8(fdp, 3, 12).replace('/', '_')}.sig"
334+
signature_path = model_path_p / fname
335+
336+
# Ignores
337+
ignore_git = fdp.ConsumeBool()
338+
extra_ignores: list[Path] = []
339+
340+
# 4) Sign and 5) Verify
341+
try:
342+
signing.Config().use_certificate_signer(
343+
private_key=leaf_key_path,
344+
signing_certificate=leaf_cert_path,
345+
certificate_chain=chain_paths,
346+
).set_hashing_config(
347+
hashing.Config().set_ignored_paths(
348+
paths=[*list(extra_ignores), signature_path],
349+
ignore_git_paths=ignore_git,
350+
)
351+
).sign(model_path_p, signature_path)
352+
353+
verifying.Config().use_certificate_verifier(
354+
certificate_chain=chain_paths, log_fingerprints=False
355+
).set_hashing_config(
356+
hashing.Config().set_ignored_paths(
357+
paths=[*list(extra_ignores), signature_path],
358+
ignore_git_paths=ignore_git,
359+
)
360+
).verify(model_path_p, signature_path)
361+
362+
finally:
363+
# Always clean up temp dirs
364+
shutil.rmtree(model_path_dir, ignore_errors=True)
365+
shutil.rmtree(workdir, ignore_errors=True)
366+
367+
368+
def main() -> None:
369+
atheris.instrument_all()
370+
atheris.Setup(sys.argv, TestOneInput)
371+
atheris.Fuzz()
372+
373+
374+
if __name__ == "__main__":
375+
main()

0 commit comments

Comments
 (0)