Skip to content

Commit 70c7acf

Browse files
committed
Upload SBOM with privacy and type
Problem: Uploading an sbom can specify the type and the privacy setting. Solution: Added params optional argument that specifies a dict of "sbomType" and "privacy". Additionally added confim=True option to upload method to ensure that the metadata is written to the database before return. Upgraded coverage.py to version 6.2. Signed-off-by: Paul Hewlett <phewlett76@gmail.com>
1 parent bc25471 commit 70c7acf

12 files changed

Lines changed: 283 additions & 49 deletions

File tree

archivist/archivist.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,13 @@ def post(
355355

356356
@retry_429
357357
def post_file(
358-
self, path: str, fd: BinaryIO, mtype: str, *, form: Optional[str] = "file"
358+
self,
359+
path: str,
360+
fd: BinaryIO,
361+
mtype: str,
362+
*,
363+
form: Optional[str] = "file",
364+
params: Optional[Dict] = None,
359365
) -> Dict:
360366
"""POST method (REST) - upload binary
361367
@@ -365,6 +371,7 @@ def post_file(
365371
path (str): e.g. v2/assets
366372
fd : iterable representing the contents of a file.
367373
mtype (str): mime type e.g. image/jpg
374+
params (dict): dictiuonary of optional path params
368375
369376
Returns:
370377
dict representing the response body (entity).
@@ -379,6 +386,10 @@ def post_file(
379386
headers = {
380387
"content-type": multipart.content_type,
381388
}
389+
if params:
390+
qry = "&".join(sorted(f"{k}={v}" for k, v in _dotstring(params)))
391+
path = "?".join((path, qry))
392+
382393
response = self._session.post(
383394
SEP.join((self.url, ROOT, path)),
384395
data=multipart, # type: ignore https://github.com/requests/toolbelt/issues/312

archivist/confirmer.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ def _wait_for_confirmation(
7979
on_giveup=__on_giveup_confirmation,
8080
)
8181
def _wait_for_confirmation(self, identity):
82-
"""docstring"""
82+
"""Return None until entity is confirmed"""
8383
entity = self.read(identity)
8484

8585
if CONFIRMATION_STATUS not in entity:
@@ -115,7 +115,7 @@ def __on_giveup_confirmed(details):
115115
on_giveup=__on_giveup_confirmed,
116116
)
117117
def _wait_for_confirmed(self, *, props=None, **kwargs) -> bool:
118-
"""docstring"""
118+
"""Return False until all entities are confirmed"""
119119

120120
# look for unconfirmed entities
121121
newprops = deepcopy(props) if props else {}

archivist/publisher.py

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55

66
import logging
77

8-
from typing import overload
9-
108
import backoff
119

1210
from .errors import ArchivistUnpublishedError
@@ -43,19 +41,6 @@ def __on_giveup_publication(details):
4341
)
4442

4543

46-
# These overloads are used for type hinting, if self is sboms client then
47-
# an SBOM metadata will be returned.
48-
# Overloads are evaluated at startup but not at runtime, therefore
49-
# no test coverage be done directly.
50-
51-
52-
@overload
53-
def _wait_for_publication(
54-
self: "sboms._SbomsClient", identity: str
55-
) -> "sbommetadata.SBOM":
56-
... # pragma: no cover
57-
58-
5944
@backoff.on_predicate(
6045
backoff.expo,
6146
logger=LOGGER,
@@ -64,7 +49,7 @@ def _wait_for_publication(
6449
on_giveup=__on_giveup_publication,
6550
)
6651
def _wait_for_publication(self, identity):
67-
"""docstring"""
52+
"""Return None until published date is set"""
6853
entity = self.read(identity)
6954

7055
if entity.published_date:

archivist/sboms.py

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
SBOMS_WITHDRAW,
4444
SBOMS_PUBLISH,
4545
)
46-
from . import publisher, withdrawer
46+
from . import publisher, uploader, withdrawer
4747
from .dictmerge import _deepmerge
4848
from .sbommetadata import SBOM
4949

@@ -66,29 +66,60 @@ class _SBOMSClient:
6666
def __init__(self, archivist: "type_helper.Archivist"):
6767
self._archivist = archivist
6868

69-
def upload(self, fd: BinaryIO, *, mtype: str = "text/xml") -> SBOM:
69+
def upload(
70+
self,
71+
fd: BinaryIO,
72+
*,
73+
confirm: bool = False,
74+
mtype: str = "text/xml",
75+
params: Optional[Dict] = None,
76+
) -> SBOM:
7077
"""Create SBOM
7178
7279
Creates SBOM from opened file or other data source.
7380
7481
Args:
7582
fd (file): opened file descriptor or other file-type iterable.
83+
confirm (bool): if True wait for sbom to be uploaded.
7684
mtype (str): mimetype of data.
85+
params (dict): optional e.g. {"sbomType": "cyclonedx-xml", "privacy": "PUBLIC" }
7786
7887
Returns:
7988
:class:`SBOM` instance
8089
8190
"""
8291

8392
LOGGER.debug("Upload SBOM")
84-
return SBOM(
93+
94+
sbom = SBOM(
8595
**self._archivist.post_file(
8696
f"{SBOMS_SUBPATH}/{SBOMS_LABEL}",
8797
fd,
8898
mtype,
8999
form="sbom",
100+
params=params,
90101
)
91102
)
103+
if not confirm:
104+
return sbom
105+
106+
return self.wait_for_uploading(sbom.identity)
107+
108+
def wait_for_uploading(self, identity: str) -> SBOM:
109+
"""Wait for sbom to be uploaded.
110+
111+
Waits for sbom to be uploaded.
112+
113+
Args:
114+
identity (str): identity of sbom
115+
116+
Returns:
117+
True if sbom is uploaded.
118+
119+
"""
120+
uploader.MAX_TIME = self._archivist.max_time
121+
# pylint: disable=protected-access
122+
return uploader._wait_for_uploading(self, identity) # type: ignore
92123

93124
def download(self, identity: str, fd: BinaryIO) -> Response:
94125
"""Read SBOM
@@ -212,7 +243,7 @@ def wait_for_publication(self, identity: str) -> SBOM:
212243
"""
213244
publisher.MAX_TIME = self._archivist.max_time
214245
# pylint: disable=protected-access
215-
return publisher._wait_for_publication(self, identity)
246+
return publisher._wait_for_publication(self, identity) # type: ignore
216247

217248
def withdraw(self, identity: str, confirm: bool = False) -> SBOM:
218249
"""Withdraw SBOM
@@ -254,4 +285,4 @@ def wait_for_withdrawn(self, identity: str) -> SBOM:
254285
"""
255286
withdrawer.MAX_TIME = self._archivist.max_time
256287
# pylint: disable=protected-access
257-
return withdrawer._wait_for_withdrawn(self, identity)
288+
return withdrawer._wait_for_withdrawn(self, identity) # type: ignore

archivist/uploader.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
"""uploader interface
2+
"""
3+
4+
import logging
5+
6+
import backoff
7+
8+
from .errors import ArchivistNotFoundError
9+
10+
11+
# pylint:disable=unused-import # To prevent cyclical import errors forward referencing is used
12+
# pylint:disable=cyclic-import # but pylint doesn't understand this feature
13+
from . import sboms
14+
15+
MAX_TIME = 1200
16+
17+
LOGGER = logging.getLogger(__name__)
18+
19+
20+
def __lookup_max_time():
21+
return MAX_TIME
22+
23+
24+
# pylint: disable=consider-using-f-string
25+
def __backoff_handler(details):
26+
LOGGER.debug("MAX_TIME %s", MAX_TIME)
27+
LOGGER.debug(
28+
"Backing off {wait:0.1f} seconds afters {tries} tries "
29+
"calling function {target} with args {args} and kwargs "
30+
"{kwargs}".format(**details)
31+
)
32+
33+
34+
def __on_giveup_uploading(details):
35+
identity = details["args"][1] # first argument to wait_for_uploading
36+
elapsed = details["elapsed"]
37+
raise ArchivistNotFoundError(
38+
f"uploading for {identity} timed out after {elapsed} seconds"
39+
)
40+
41+
42+
@backoff.on_predicate(
43+
backoff.expo,
44+
logger=LOGGER,
45+
max_time=__lookup_max_time,
46+
on_backoff=__backoff_handler,
47+
on_giveup=__on_giveup_uploading,
48+
)
49+
def _wait_for_uploading(self, identity):
50+
"""Return None until identity is found"""
51+
try:
52+
LOGGER.debug("Uploader Read %s", identity)
53+
entity = self.read(identity)
54+
except ArchivistNotFoundError:
55+
return None
56+
57+
return entity

archivist/withdrawer.py

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55

66
import logging
77

8-
from typing import overload
9-
108
import backoff
119

1210
from .errors import ArchivistUnwithdrawnError
@@ -43,19 +41,6 @@ def __on_giveup_withdrawn(details):
4341
)
4442

4543

46-
# These overloads are used for type hinting, if self is sboms client then
47-
# an SBOM metadata will be returned.
48-
# Overloads are evaluated at startup but not at runtime, therefore
49-
# no test coverage be done directly.
50-
51-
52-
@overload
53-
def _wait_for_publication(
54-
self: "sboms._SbomsClient", identity: str
55-
) -> "sbommetadata.SBOM":
56-
... # pragma: no cover
57-
58-
5944
@backoff.on_predicate(
6045
backoff.expo,
6146
logger=LOGGER,
@@ -64,7 +49,7 @@ def _wait_for_publication(
6449
on_giveup=__on_giveup_withdrawn,
6550
)
6651
def _wait_for_withdrawn(self, identity):
67-
"""docstring"""
52+
"""Return None until withdrawn date is set"""
6853
entity = self.read(identity)
6954

7055
if entity.withdrawn_date:

functests/execapplications.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,9 @@ def test_appidp_token(self):
174174
def test_archivist_token(self):
175175
"""
176176
Test archivist with client id/secret
177+
WARN: this test takes over 10 minutes
177178
"""
179+
print("This test takes over 10 minutes...")
178180
application = self.arch.applications.create(
179181
self.display_name,
180182
CUSTOM_CLAIMS,
@@ -187,6 +189,7 @@ def test_archivist_token(self):
187189
)
188190

189191
# archivist using app registration
192+
print("New Arch")
190193
new_arch = Archivist(
191194
environ["TEST_ARCHIVIST"],
192195
(application["client_id"], application["credentials"][0]["secret"]),
@@ -198,6 +201,9 @@ def test_archivist_token(self):
198201
traffic_light = deepcopy(ATTRS)
199202
traffic_light["arc_display_type"] = "Traffic light with violation camera"
200203
asset = new_arch.assets.create(
204+
props={
205+
"proof_mechanism": ProofMechanism.SIMPLE_HASH.name,
206+
},
201207
attrs=traffic_light,
202208
confirm=True,
203209
)

functests/execsboms.py

Lines changed: 53 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,60 @@ def tearDown(cls) -> None:
5757
with suppress(FileNotFoundError):
5858
remove(TEST_SBOM_DOWNLOAD_PATH)
5959

60+
def test_sbom_upload_with_public_privacy(self):
61+
"""
62+
Test sbom upload with privacy
63+
"""
64+
now = now_timestamp()
65+
print("Title:", self.title, now)
66+
with open(TEST_SBOM_PATH, "rb") as fd:
67+
metadata = self.arch.sboms.upload(
68+
fd, confirm=True, params={"privacy": "PUBLIC"}
69+
)
70+
print("first upload", json_dumps(metadata.dict(), indent=4))
71+
identity = metadata.identity
72+
73+
metadata1 = self.arch.sboms.read(identity)
74+
print("read", json_dumps(metadata1.dict(), indent=4))
75+
self.assertEqual(
76+
metadata,
77+
metadata1,
78+
msg="Metadata not correct",
79+
)
80+
81+
def test_sbom_upload_with_confirmation(self):
82+
"""
83+
Test sbom upload with confirmation
84+
"""
85+
now = now_timestamp()
86+
print("Title:", self.title, now)
87+
with open(TEST_SBOM_PATH, "rb") as fd:
88+
metadata = self.arch.sboms.upload(fd, confirm=True)
89+
print("first upload", json_dumps(metadata.dict(), indent=4))
90+
identity = metadata.identity
91+
92+
metadata1 = self.arch.sboms.read(identity)
93+
print("read", json_dumps(metadata1.dict(), indent=4))
94+
self.assertEqual(
95+
metadata,
96+
metadata1,
97+
msg="Metadata not correct",
98+
)
99+
100+
sleep(1) # the data may have not reached cogsearch
101+
metadatas = list(self.arch.sboms.list(metadata={"uploaded_since": now}))
102+
self.assertEqual(
103+
len(metadatas),
104+
1,
105+
msg="No. of SBOMS should be 1",
106+
)
107+
60108
def test_sbom_upload_and_download(self):
61109
"""
62110
Test sbom upload and download through the SDK
63111
"""
64-
print("Title:", self.title)
65112
now = now_timestamp()
113+
print("Title:", self.title, now)
66114
with open(TEST_SBOM_PATH, "rb") as fd:
67115
metadata = self.arch.sboms.upload(fd)
68116

@@ -83,11 +131,8 @@ def test_sbom_upload_and_download(self):
83131
msg="Metadata not correct",
84132
)
85133

86-
sleep(1) # otherwise test fails
134+
sleep(1) # the data may have not reached cogsearch
87135
metadatas = list(self.arch.sboms.list(metadata={"uploaded_since": now}))
88-
for i, m in enumerate(metadatas):
89-
print(i, ":", json_dumps(m.dict(), indent=4))
90-
91136
self.assertEqual(
92137
len(metadatas),
93138
1,
@@ -99,6 +144,9 @@ def test_sbom_upload_and_download(self):
99144
msg="Metadata not correct",
100145
)
101146

147+
for i, m in enumerate(metadatas):
148+
print(i, ":", json_dumps(m.dict(), indent=4))
149+
102150
metadata2 = self.arch.sboms.publish(identity, confirm=True)
103151
print("publish", json_dumps(metadata2.dict(), indent=4))
104152
self.assertNotEqual(
@@ -135,7 +183,6 @@ def test_sbom_upload_and_download(self):
135183
msg="Withdrawn_date not correct",
136184
)
137185

138-
sleep(1) # otherwise test fails
139186
metadatas = list(
140187
self.arch.sboms.list(
141188
page_size=50,

0 commit comments

Comments
 (0)