hash-updater: apply formatter

This commit is contained in:
Max Gautier
2025-01-15 14:34:48 +01:00
parent d8629b8e7e
commit bc36e9d440

View File

@@ -29,6 +29,7 @@ CHECKSUMS_YML = Path("roles/kubespray-defaults/defaults/main/checksums.yml")
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def open_yaml(file: Path): def open_yaml(file: Path):
yaml = YAML() yaml = YAML()
yaml.explicit_start = True yaml.explicit_start = True
@@ -60,45 +61,55 @@ arch_alt_name = {
# different verification methods (gpg, cosign) ( needs download role changes) (or verify the sig in this script and only use the checksum in the playbook) # different verification methods (gpg, cosign) ( needs download role changes) (or verify the sig in this script and only use the checksum in the playbook)
# perf improvements (async) # perf improvements (async)
def download_hash(downloads: {str: {str: Any}}) -> None: def download_hash(downloads: {str: {str: Any}}) -> None:
# Handle file with multiples hashes, with various formats. # Handle file with multiples hashes, with various formats.
# the lambda is expected to produce a dictionary of hashes indexed by arch name # the lambda is expected to produce a dictionary of hashes indexed by arch name
download_hash_extract = { download_hash_extract = {
"calicoctl_binary": lambda hashes : { "calicoctl_binary": lambda hashes: {
line.split('-')[-1] : line.split()[0] line.split("-")[-1]: line.split()[0]
for line in hashes.strip().split('\n') for line in hashes.strip().split("\n")
if line.count('-') == 2 and line.split('-')[-2] == "linux" if line.count("-") == 2 and line.split("-")[-2] == "linux"
}, },
"etcd_binary": lambda hashes : { "etcd_binary": lambda hashes: {
line.split('-')[-1].removesuffix('.tar.gz') : line.split()[0] line.split("-")[-1].removesuffix(".tar.gz"): line.split()[0]
for line in hashes.strip().split('\n') for line in hashes.strip().split("\n")
if line.split('-')[-2] == "linux" if line.split("-")[-2] == "linux"
}, },
"nerdctl_archive": lambda hashes : { "nerdctl_archive": lambda hashes: {
line.split()[1].removesuffix('.tar.gz').split('-')[3] : line.split()[0] line.split()[1].removesuffix(".tar.gz").split("-")[3]: line.split()[0]
for line in hashes.strip().split('\n') for line in hashes.strip().split("\n")
if [x for x in line.split(' ') if x][1].split('-')[2] == "linux" if [x for x in line.split(" ") if x][1].split("-")[2] == "linux"
}, },
"runc": lambda hashes : { "runc": lambda hashes: {
parts[1].split('.')[1] : parts[0] parts[1].split(".")[1]: parts[0]
for parts in (line.split() for parts in (line.split() for line in hashes.split("\n")[3:9])
for line in hashes.split('\n')[3:9]) },
}, "yq": lambda rhashes_bsd: {
"yq": lambda rhashes_bsd : { pair[0].split("_")[-1]: pair[1]
pair[0].split('_')[-1] : pair[1] # pair = (yq_<os>_<arch>, <hash>)
# pair = (yq_<os>_<arch>, <hash>) for pair in (
for pair in ((line.split()[1][1:-1], line.split()[3]) (line.split()[1][1:-1], line.split()[3])
for line in rhashes_bsd.splitlines() for line in rhashes_bsd.splitlines()
if line.startswith("SHA256")) if line.startswith("SHA256")
if pair[0].startswith("yq") )
and pair[0].split('_')[1] == "linux" if pair[0].startswith("yq")
and not pair[0].endswith(".tar.gz") and pair[0].split("_")[1] == "linux"
}, and not pair[0].endswith(".tar.gz")
} },
}
checksums_file = Path(subprocess.Popen(['git', 'rev-parse', '--show-toplevel'], checksums_file = (
stdout=subprocess.PIPE).communicate()[0].rstrip().decode('utf-8') Path(
) / CHECKSUMS_YML subprocess.Popen(
["git", "rev-parse", "--show-toplevel"], stdout=subprocess.PIPE
)
.communicate()[0]
.rstrip()
.decode("utf-8")
)
/ CHECKSUMS_YML
)
logger.info("Opening checksums file %s...", checksums_file) logger.info("Opening checksums file %s...", checksums_file)
data, yaml = open_yaml(checksums_file) data, yaml = open_yaml(checksums_file)
s = requests.Session() s = requests.Session()
@@ -106,33 +117,40 @@ def download_hash(downloads: {str: {str: Any}}) -> None:
@cache @cache
def _get_hash_by_arch(download: str, version: str) -> {str: str}: def _get_hash_by_arch(download: str, version: str) -> {str: str}:
hash_file = s.get(downloads[download]['url'].format( hash_file = s.get(
version = version, downloads[download]["url"].format(
os = "linux", version=version,
os="linux",
), ),
allow_redirects=True) allow_redirects=True,
)
hash_file.raise_for_status() hash_file.raise_for_status()
return download_hash_extract[download](hash_file.content.decode()) return download_hash_extract[download](hash_file.content.decode())
releases, tags = map(
releases, tags = map(dict, partition(lambda r: r[1].get('tags', False), downloads.items())) dict, partition(lambda r: r[1].get("tags", False), downloads.items())
)
repos = { repos = {
'with_releases': [r['graphql_id'] for r in releases.values()], "with_releases": [r["graphql_id"] for r in releases.values()],
'with_tags': [t['graphql_id'] for t in tags.values()], "with_tags": [t["graphql_id"] for t in tags.values()],
} }
response = s.post("https://api.github.com/graphql", response = s.post(
json={'query': files(__package__).joinpath('list_releases.graphql').read_text(), "https://api.github.com/graphql",
'variables': repos}, json={
headers={ "query": files(__package__).joinpath("list_releases.graphql").read_text(),
"Authorization": f"Bearer {os.environ['API_KEY']}", "variables": repos,
} },
) headers={
if 'x-ratelimit-used' in response.headers._store: "Authorization": f"Bearer {os.environ['API_KEY']}",
logger.info("Github graphQL API ratelimit status: used %s of %s. Next reset at %s", },
response.headers['X-RateLimit-Used'], )
response.headers['X-RateLimit-Limit'], if "x-ratelimit-used" in response.headers._store:
datetime.fromtimestamp(int(response.headers["X-RateLimit-Reset"])) logger.info(
) "Github graphQL API ratelimit status: used %s of %s. Next reset at %s",
response.headers["X-RateLimit-Used"],
response.headers["X-RateLimit-Limit"],
datetime.fromtimestamp(int(response.headers["X-RateLimit-Reset"])),
)
response.raise_for_status() response.raise_for_status()
def valid_version(possible_version: str) -> Optional[Version]: def valid_version(possible_version: str) -> Optional[Version]:
@@ -140,57 +158,76 @@ def download_hash(downloads: {str: {str: Any}}) -> None:
return Version(possible_version) return Version(possible_version)
except InvalidVersion: except InvalidVersion:
return None return None
repos = response.json()["data"] repos = response.json()["data"]
github_versions = dict(zip(chain(releases.keys(), tags.keys()), github_versions = dict(
[ zip(
{ chain(releases.keys(), tags.keys()),
v for r in repo["releases"]["nodes"] [
if not r["isPrerelease"] {
and (v := valid_version(r["tagName"])) is not None v
} for r in repo["releases"]["nodes"]
for repo in repos["with_releases"] if not r["isPrerelease"]
] + and (v := valid_version(r["tagName"])) is not None
[ }
{ v for t in repo["refs"]["nodes"] for repo in repos["with_releases"]
if (v := valid_version(t["name"].removeprefix('release-'))) is not None ]
} + [
for repo in repos["with_tags"] {
], v
strict=True)) for t in repo["refs"]["nodes"]
if (v := valid_version(t["name"].removeprefix("release-")))
is not None
}
for repo in repos["with_tags"]
],
strict=True,
)
)
components_supported_arch = { components_supported_arch = {
component.removesuffix('_checksums'): [a for a in archs.keys()] component.removesuffix("_checksums"): [a for a in archs.keys()]
for component, archs in data.items() for component, archs in data.items()
} }
new_versions = { new_versions = {
c: c: {
{v for v in github_versions[c] v
if any(v > version for v in github_versions[c]
and ( if any(
(v.major, v.minor) == (version.major, version.minor) v > version
or c.startswith('gvisor') and (
) (v.major, v.minor) == (version.major, version.minor)
for version in [max(minors) for _, minors in groupby(cur_v, lambda v: (v.minor, v.major))] or c.startswith("gvisor")
) )
# only get: for version in [
# - patch versions (no minor or major bump) (exception for gvisor which does not have a major.minor.patch scheme max(minors)
# - newer ones (don't get old patch version) for _, minors in groupby(cur_v, lambda v: (v.minor, v.major))
} ]
- set(cur_v) )
for component, archs in data.items() # only get:
if (c := component.removesuffix('_checksums')) in downloads.keys() # - patch versions (no minor or major bump) (exception for gvisor which does not have a major.minor.patch scheme
# this is only to bound cur_v in the scope # - newer ones (don't get old patch version)
and (cur_v := sorted(Version(str(k)) for k in next(archs.values().__iter__()).keys()))
} }
- set(cur_v)
for component, archs in data.items()
if (c := component.removesuffix("_checksums")) in downloads.keys()
# this is only to bound cur_v in the scope
and (
cur_v := sorted(
Version(str(k)) for k in next(archs.values().__iter__()).keys()
)
)
}
hash_set_to_0 = { hash_set_to_0 = {
c: { c: {
Version(str(v)) for v, h in chain.from_iterable(a.items() for a in archs.values()) Version(str(v))
if h == 0 for v, h in chain.from_iterable(a.items() for a in archs.values())
} if h == 0
for component, archs in data.items() }
if (c := component.removesuffix('_checksums')) in downloads.keys() for component, archs in data.items()
} if (c := component.removesuffix("_checksums")) in downloads.keys()
}
def get_hash(component: str, version: Version, arch: str): def get_hash(component: str, version: Version, arch: str):
if component in download_hash_extract: if component in download_hash_extract:
@@ -198,37 +235,38 @@ def download_hash(downloads: {str: {str: Any}}) -> None:
return hashes[arch] return hashes[arch]
else: else:
hash_file = s.get( hash_file = s.get(
downloads[component]['url'].format( downloads[component]["url"].format(
version = version, version=version,
os = "linux", os="linux",
arch = arch, arch=arch,
alt_arch = arch_alt_name[arch], alt_arch=arch_alt_name[arch],
), ),
allow_redirects=True) allow_redirects=True,
)
hash_file.raise_for_status() hash_file.raise_for_status()
if downloads[component].get('binary', False): if downloads[component].get("binary", False):
return hashlib.new( return hashlib.new(
downloads[component].get('hashtype', 'sha256'), downloads[component].get("hashtype", "sha256"), hash_file.content
hash_file.content ).hexdigest()
).hexdigest() return hash_file.content.decode().split()[0]
return (hash_file.content.decode().split()[0])
for component, versions in chain(new_versions.items(), hash_set_to_0.items()): for component, versions in chain(new_versions.items(), hash_set_to_0.items()):
c = component + '_checksums' c = component + "_checksums"
for arch in components_supported_arch[component]: for arch in components_supported_arch[component]:
for version in versions: for version in versions:
data[c][arch][str(version)] = f"{downloads[component].get('hashtype', 'sha256')}:{get_hash(component, version, arch)}" data[c][arch][
str(version)
data[c] = {arch : ] = f"{downloads[component].get('hashtype', 'sha256')}:{get_hash(component, version, arch)}"
{v :
versions[v] for v in sorted(versions.keys(),
key=lambda v: Version(str(v)),
reverse=True)
}
for arch, versions in data[c].items()
}
data[c] = {
arch: {
v: versions[v]
for v in sorted(
versions.keys(), key=lambda v: Version(str(v)), reverse=True
)
}
for arch, versions in data[c].items()
}
with open(checksums_file, "w") as checksums_yml: with open(checksums_file, "w") as checksums_yml:
yaml.dump(data, checksums_yml) yaml.dump(data, checksums_yml)
@@ -238,9 +276,10 @@ def download_hash(downloads: {str: {str: Any}}) -> None:
def main(): def main():
logging.basicConfig(stream=sys.stdout, level=logging.INFO) logging.basicConfig(stream=sys.stdout, level=logging.INFO)
parser = argparse.ArgumentParser(description=f"Add new patch versions hashes in {CHECKSUMS_YML}", parser = argparse.ArgumentParser(
formatter_class=argparse.RawTextHelpFormatter, description=f"Add new patch versions hashes in {CHECKSUMS_YML}",
epilog=f""" formatter_class=argparse.RawTextHelpFormatter,
epilog=f"""
This script only lookup new patch versions relative to those already existing This script only lookup new patch versions relative to those already existing
in the data in {CHECKSUMS_YML}, in the data in {CHECKSUMS_YML},
which means it won't add new major or minor versions. which means it won't add new major or minor versions.
@@ -259,8 +298,7 @@ def main():
amd64: amd64:
+ 1.30.0: 0 + 1.30.0: 0
1.29.0: d16a1ffb3938f5a19d5c8f45d363bd091ef89c0bc4d44ad16b933eede32fdcbb 1.29.0: d16a1ffb3938f5a19d5c8f45d363bd091ef89c0bc4d44ad16b933eede32fdcbb
1.28.0: 8dc78774f7cbeaf787994d386eec663f0a3cf24de1ea4893598096cb39ef2508""" 1.28.0: 8dc78774f7cbeaf787994d386eec663f0a3cf24de1ea4893598096cb39ef2508""",
) )
# Workaround for https://github.com/python/cpython/issues/53834#issuecomment-2060825835 # Workaround for https://github.com/python/cpython/issues/53834#issuecomment-2060825835
@@ -273,14 +311,25 @@ def main():
def __contains__(self, item): def __contains__(self, item):
return super().__contains__(item) or item == self.default return super().__contains__(item) or item == self.default
choices = Choices(downloads.keys(), default=list(downloads.keys())) choices = Choices(components.infos.keys(), default=list(components.infos.keys()))
parser.add_argument('only', nargs='*', choices=choices, parser.add_argument(
help='if provided, only obtain hashes for these compoments', "only",
default=choices.default) nargs="*",
parser.add_argument('-e', '--exclude', action='append', choices=downloads.keys(), choices=choices,
help='do not obtain hashes for this component', help="if provided, only obtain hashes for these compoments",
default=[]) default=choices.default,
)
parser.add_argument(
"-e",
"--exclude",
action="append",
choices=components.infos.keys(),
help="do not obtain hashes for this component",
default=[],
)
args = parser.parse_args() args = parser.parse_args()
download_hash({k: downloads[k] for k in (set(args.only) - set(args.exclude))}) download_hash(
{k: components.infos[k] for k in (set(args.only) - set(args.exclude))}
)