123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159 |
- # Copyright (c) 2021, Open Source Robotics Foundation
- # All rights reserved.
- #
- # Redistribution and use in source and binary forms, with or without
- # modification, are permitted provided that the following conditions are met:
- #
- # * Redistributions of source code must retain the above copyright
- # notice, this list of conditions and the following disclaimer.
- # * Redistributions in binary form must reproduce the above copyright
- # notice, this list of conditions and the following disclaimer in the
- # documentation and/or other materials provided with the distribution.
- # * Neither the name of the Willow Garage, Inc. nor the names of its
- # contributors may be used to endorse or promote products derived from
- # this software without specific prior written permission.
- #
- # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- # POSSIBILITY OF SUCH DAMAGE.
- import os
- from xml.etree import ElementTree
- from . import open_gz_url
- from . import PackageEntry
- from . import RepositoryCacheCollection
- def replace_tokens(string, os_name, os_code_name, os_arch):
- """Replace RPM-specific tokens in the repository base URL."""
- for key, value in {
- '$basearch': os_arch,
- '$distname': os_name,
- '$releasever': os_code_name,
- }.items():
- string = string.replace(key, value)
- return string
- def get_primary_name(repomd_url):
- """Get the URL of the 'primary' metadata from the 'repo' metadata."""
- print('Reading RPM repository metadata from ' + repomd_url)
- with open_gz_url(repomd_url) as f:
- tree = iter(ElementTree.iterparse(f, events=('start', 'end')))
- event, root = next(tree)
- if root.tag != '{http://linux.duke.edu/metadata/repo}repomd':
- raise RuntimeError('Invalid root element in repository metadata: ' + root.tag)
- for event, root_child in tree:
- if (
- root_child.tag != '{http://linux.duke.edu/metadata/repo}data' or
- root_child.attrib.get('type', '') != 'primary'
- ):
- root.clear()
- continue
- for data_child in root_child:
- if (
- data_child.tag != '{http://linux.duke.edu/metadata/repo}location' or
- 'href' not in data_child.attrib
- ):
- root.clear()
- continue
- return data_child.attrib['href']
- root.clear()
- raise RuntimeError('Failed to determine primary data file name')
- def enumerate_rpm_packages(base_url, os_name, os_code_name, os_arch):
- """
- Enumerate packages in an RPM repository.
- :param base_url: the RPM repository base URL.
- :param os_name: the name of the OS associated with the repository.
- :param os_code_name: the OS version associated with the repository.
- :param os_arch: the system architecture associated with the repository.
- :returns: an enumeration of package entries.
- """
- base_url = replace_tokens(base_url, os_name, os_code_name, os_arch)
- repomd_url = os.path.join(base_url, 'repodata', 'repomd.xml')
- primary_xml_name = get_primary_name(repomd_url)
- primary_xml_url = os.path.join(base_url, primary_xml_name)
- print('Reading RPM primary metadata from ' + primary_xml_url)
- with open_gz_url(primary_xml_url) as f:
- tree = ElementTree.iterparse(f)
- for event, element in tree:
- if (
- element.tag != '{http://linux.duke.edu/metadata/common}package' or
- element.attrib.get('type', '') != 'rpm'
- ):
- continue
- pkg_name = None
- pkg_version = None
- pkg_src_name = None
- pkg_url = None
- pkg_provs = []
- for pkg_child in element:
- if pkg_child.tag == '{http://linux.duke.edu/metadata/common}name':
- pkg_name = pkg_child.text
- elif pkg_child.tag == '{http://linux.duke.edu/metadata/common}version':
- pkg_version = pkg_child.attrib.get('ver')
- if pkg_version:
- pkg_epoch = pkg_child.attrib.get('epoch', '0')
- if pkg_epoch != '0':
- pkg_version = pkg_epoch + ':' + pkg_version
- pkg_rel = pkg_child.attrib.get('rel')
- if pkg_rel:
- pkg_version = pkg_version + '-' + pkg_rel
- elif pkg_child.tag == '{http://linux.duke.edu/metadata/common}location':
- pkg_href = pkg_child.attrib.get('href')
- if pkg_href:
- pkg_url = os.path.join(base_url, pkg_href)
- elif pkg_child.tag == '{http://linux.duke.edu/metadata/common}format':
- for format_child in pkg_child:
- if format_child.tag == '{http://linux.duke.edu/metadata/rpm}sourcerpm':
- if format_child.text:
- pkg_src_name = '-'.join(format_child.text.split('-')[:-2])
- if format_child.tag != '{http://linux.duke.edu/metadata/rpm}provides':
- continue
- for provides in format_child:
- if (
- provides.tag != '{http://linux.duke.edu/metadata/rpm}entry' or
- 'name' not in provides.attrib
- ):
- continue
- prov_version = None
- if provides.attrib.get('flags', '') == 'EQ':
- prov_version = provides.attrib.get('ver')
- if prov_version:
- prov_epoch = provides.attrib.get('epoch', '0')
- if prov_epoch != '0':
- prov_version = prov_epoch + ':' + prov_version
- prov_rel = provides.attrib.get('rel')
- if prov_rel:
- prov_version = prov_version + '-' + prov_rel
- pkg_provs.append((provides.attrib['name'], prov_version))
- yield PackageEntry(pkg_name, pkg_version, pkg_url, pkg_src_name)
- for prov_name, prov_version in pkg_provs:
- yield PackageEntry(prov_name, prov_version, pkg_url, pkg_src_name, pkg_name)
- element.clear()
- def rpm_base_url(base_url):
- """
- Create an enumerable cache for an RPM repository.
- :param base_url: the URL of the RPM repository.
- :returns: an enumerable repository cache instance.
- """
- return RepositoryCacheCollection(
- lambda os_name, os_code_name, os_arch:
- enumerate_rpm_packages(base_url, os_name, os_code_name, os_arch))
|