#!/usr/bin/python3 # -*- mode: python; coding: utf-8 -*- # ########################################################################## # generate-application-services - Download application services from Deb repositories # Copyright © 2019 Pôle de compétences EOLE <eole@ac-dijon.fr> # Author: Daniel Dehennin <daniel.dehennin@ac-dijon.fr> # # License AGPL-3: # Application # Copyright (C) 2019 Équipe EOLE <eole@ac-dijon.fr> # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. ########################################################################## """Generate Zéphir2 application services for EOLE and Envole SYNOPSYS ======== generate-application-services --release 2.7.0 --devel --verbose generate-application-services --release 2.6.0,2.6.1,2.6.2 generate-application-services --release all generate-application-services --release 2.5.2 --out-dir /tmp/2.5.2/ """ import argparse import logging import os import re import shutil import hashlib import requests from ruamel.yaml import YAML from ruamel.yaml.scalarstring import PreservedScalarString from debian import deb822 from debian import debfile logger = None # pylint: disable=bad-continuation EOLE_ENVOLE_MAPPING = {'2.4.0': '4', '2.4.1': '4', '2.4.2': '4', '2.5.0': '4', '2.5.1': '4', '2.5.2': '5', '2.6.0': '5', '2.6.1': '6', '2.6.2': '6', '2.7.0': '6', '2.7.1': '6', } # Stable distribution suffixes EOLE_SUFFIXES = ['', '-updates', '-security'] # Map filename prefix with destination directory # pylint: disable=bad-continuation FILE_MAP = {'/usr/share/eole/creole/dicos/': 'dictionaries', '/usr/share/eole/creole/distrib/': 'templates', '/usr/share/eole/creole/extra/': 'extra_dictionaries', '/usr/share/creole/funcs/': 'creole_funcs', '/usr/share/eole/preservice/': 'preservices', '/usr/share/eole/postservice/': 'postservices', '/usr/share/eole/pretemplate/': 'pretemplates', '/usr/share/eole/posttemplate/': 'posttemplates', } # Match FILE_RULES = [re.compile(rf'^\.?({path})') for path in FILE_MAP] SCHEMA_RULES = [re.compile(r'^\.?/usr/share/eole/creole/(dicos|extra).*xml')] # pylint: disable=bad-continuation EXCLUDED_PACKAGES = {'creole': True, # Provide only a sample dictionary 'eole-zephir': True, # Zephir can not manage another Zephir ## MTES packages 'eole-antivir2': True, 'ecdl-outils':True, 'eole-ecdl':True, 'eole-ecdlannuaire':True, 'eole-esbl':True, 'eole-esbl-glpi':True, 'eole-esbl-grr':True, 'eole-esbl-ocs':True, 'eole-geo-ide-base':True, 'eole-geo-ide-distribution':True, 'eole-ocsinventory-agent':True, 'eole-wapt':True, 'seth-ecologie':True, 'supervision-psin':True, ## Envole packages # 'eole-envole-migration':True, } # pylint: disable=line-too-long EXCLUDED_FILES = ['/usr/share/doc', # All debian packages '/usr/share/eqos', # eole-eqos '/usr/share/eole/creole/extra/[^/]+/[^/]+(?!\.xml)/', # EAD3 actions garbage '/usr/share/envole/envole-themes/', # eole-envole-themes ] EXCLUDED_FILES_RULES = [re.compile(rf'^\.?({path})') for path in EXCLUDED_FILES] def main(): """Main execution function """ options = parse_args() global logger logging.basicConfig(level=options.log_level.upper(), style='$', # format style as string.Template format=options.log_format) logger = logging.getLogger(name='generate-application-services') if options.verbose: # Do not get urllib3 logs on verbose urllib3_logger = logging.getLogger(name='urllib3.connectionpool') urllib3_logger.propagate = False logger.debug('Start generate-application-services main procedure with options=“%(options)s”', {'options': options}) if not os.access(options.temp_dir, os.W_OK): logger.debug('Create temporary directory “%(temp_dir)s”', {'temp_dir': options.temp_dir}) os.makedirs(options.temp_dir, 0o700) for release in options.release: logger.info('Release “%(release)s”: generate application services', {'release': release}) loop_distributions(release, options) def parse_args(): """Simply parse the command line options """ parser = argparse.ArgumentParser(description='Downoald application services') parser.add_argument('-r', '--release', nargs='*', required=True, type=str, help='EOLE release') target_opt = parser.add_mutually_exclusive_group() target_opt.add_argument('-p', '--proposed', action='store_true', help='Use proposed-updates') target_opt.add_argument('--testing', action='store_true', help='Use testing distribution') target_opt.add_argument('--devel', action='store_true', help='Use devel distribution') parser.add_argument('-m', '--mirror', type=str, default='http://eole.ac-dijon.fr', help='Debian package mirror') current_file = os.path.abspath(__file__) seed_dir = os.path.join(os.path.dirname(current_file), 'seed') application_services_dir = os.path.join(seed_dir, 'release') parser.add_argument('--out-dir', type=str, default=application_services_dir, help='Download directory') parser.add_argument('--temp-dir', type=str, default='/tmp/generate-application-services', help='Temporary download directory') parser.add_argument('--proxy', type=str, help="HTTP proxy") log_opts = parser.add_argument_group('logging') log_opts.add_argument('-l', '--log-level', choices=['debug', 'info', 'warning', 'error', 'critical'], default='info', help='Log level') log_opts.add_argument('-v', '--verbose', action='store_true', help='Verbose mode') log_opts.add_argument('-d', '--debug', action='store_true', help='Debug mode, equivalent to -l debug') options = parser.parse_args() ## ## Mangle options ## options.log_format = '${message}' if options.debug: options.log_level = 'debug' options.log_format = '${levelname}: ${message}' elif options.verbose: # Verbose is like debug without log level name prefix and urllib3 messages options.log_level = 'debug' if 'all' in options.release: options.release = sorted(EOLE_ENVOLE_MAPPING.keys()) return options def loop_distributions(release, options): """Loop on vendors distributions :param str release: EOLE release :param Namespace options: options passed to the script """ logger.debug('Release “%(release)s”: process all the distributions', {'release': release}) vendor_map = {'eole': {}, 'envole': {}} eole_version = '.'.join(release.split('.')[0:2]) if options.devel: vendor_map['eole'][release] = [f'eole-{eole_version}-unstable'] elif options.testing: vendor_map['eole'][release] = [f'eole-{eole_version}-testing'] else: vendor_map['eole'][release] = [] suffixes = EOLE_SUFFIXES.copy() if options.proposed: suffixes.append('-proposed-updates') for suffix in suffixes: vendor_map['eole'][release].append(f'eole-{release}{suffix}') if release in EOLE_ENVOLE_MAPPING and EOLE_ENVOLE_MAPPING[release]: envole_release = EOLE_ENVOLE_MAPPING[release] if options.devel: vendor_map['envole'][envole_release] = [f'envole-{envole_release}-unstable'] elif options.testing or options.proposed: vendor_map['envole'][envole_release] = [f'envole-{envole_release}-testing'] else: vendor_map['envole'][envole_release] = [f'envole-{envole_release}'] vendor_services = gen_vendor_application_services(vendor_map, release, options) finalise_application_services(vendor_services, release, options) def gen_vendor_application_services(vendor_map, release, options): """Generate the application services packages for all the vendors :param dict vendor_map: distributions where to look for application services :param str release: EOLE release :param Namespace options: options passed to the script """ logger.info('Release %(release)s: download the application services for vendors “%(vendors)s”', {'release': release, 'vendors': ", ".join(vendor_map.keys())}) services = {} for vendor, vendor_releases in vendor_map.items(): services[vendor] = {} for vendor_release, distributions in vendor_releases.items(): services[vendor][vendor_release] = {} for distribution in distributions: logger.info('Release “%(release)s”: Vendor “%(vendor)s”: Vendor release “%(vendor_release)s”: Distribution “%(distribution)s”: generate application services', {'release': release, 'vendor': vendor, 'vendor_release': vendor_release, 'distribution': distribution}) packages = get_packages(vendor, distribution, release, options) services[vendor][vendor_release].update(extract_application_services(packages, vendor, vendor_release, distribution, release, options)) return services def finalise_application_services(vendor_services, release, options): """finalise EOLE application services to the output directory :param dict vendor_services: services informations per vendor and release :param str release: EOLE release :param Namespace options: options passed to the script """ log_prefix = f'Release “{release}”:' logger.info('%(log_prefix)s copy application service to output directory', {'log_prefix': log_prefix}) yaml = YAML() yaml.indent(offset=2) yaml.default_flow_style = False for vendor, vendor_infos in vendor_services.items(): vendor_log_prefix = f'{log_prefix} Vendor “{vendor}”:' for vendor_release, service_infos in vendor_infos.items(): release_log_prefix = f'{vendor_log_prefix} Vendor release “{vendor_release}”:' vendor_version = '.'.join(vendor_release.split('.')[0:2]) src_dir = os.path.join(options.temp_dir, 'applicationservice', vendor, vendor_release) dst_dir = os.path.join(options.out_dir, f'{vendor}-{vendor_version}', vendor_release, 'applicationservice') if os.access(dst_dir, os.W_OK): logger.debug('%(log_prefix)s delete existing release application services directory “%(dst_dir)s”', {'log_prefix': release_log_prefix, 'dst_dir': dst_dir}) shutil.rmtree(dst_dir) logger.debug('%(log_prefix)s copy release application services directory “%(src_dir)s” to “%(dst_dir)s”', {'log_prefix': release_log_prefix, 'src_dir': src_dir, 'dst_dir': dst_dir}) shutil.copytree(src_dir, dst_dir, copy_function=shutil.copyfile, symlinks=True, ignore_dangling_symlinks=True) for name, infos in service_infos.items(): service_log_prefix = f'{release_log_prefix} Service “{name}”:' logger.debug('%(log_prefix)s finalize service', {'log_prefix': service_log_prefix}) service_filename = os.path.join(dst_dir, name, 'service.yml') if infos['depends']: # Make sure to pass dependencies as first argument to allow recursive calls depends = filter_deps(infos['depends'], infos, vendor_services, vendor, vendor_release, release, options) infos['depends'] = depends['services'] infos['packages'] = depends['packages'] if infos['packages']: package_xml_schema_dir = os.path.join(dst_dir, name, 'dictionaries') package_xml_schema_filename = os.path.join(package_xml_schema_dir, f'00_{name}_packages.xml') package_xml_schema = gen_packages_xml_schema(infos, vendor, vendor_release, release) if not os.access(package_xml_schema_dir, os.W_OK): logger.info('%(log_prefix)s create EOLE XML schema directory “%(xml_schema_directory)s”', {'log_prefix': service_log_prefix, 'xml_schema_directory': package_xml_schema_dir}) os.makedirs(package_xml_schema_dir, 0o700) with open(package_xml_schema_filename, 'w') as schema_fh: logger.info('%(log_prefix)s write packages EOLE XML schema file “%(xml_schema_filename)s”', {'log_prefix': service_log_prefix, 'xml_schema_filename': package_xml_schema_filename}) schema_fh.write(package_xml_schema) else: logger.debug('%(log_prefix)s no packages dependency', {'log_prefix': service_log_prefix}) with open(service_filename, 'w') as service_fh: logger.info('%(log_prefix)s write service description file “%(service_filename)s”', {'log_prefix': service_log_prefix, 'service_filename': service_filename}) yaml.dump(infos, service_fh) def get_packages(vendor, distribution, release, options): """Retrieve the list of binary packages :param str vendor: vendor name :param str distribution: distribution name :param str release: EOLE release :param Namespace options: options passed to the script """ log_prefix = f'Release “{release}”: Vendor “{vendor}”: Distribution “{distribution}”:' logger.debug('%(log_prefix)s get the binary packages', {'log_prefix': log_prefix}) packages = [] index_dir = os.path.join(options.temp_dir, 'indexes') if not os.access(index_dir, os.W_OK): logger.debug('%(log_prefix)s create temporary directory for indexes “%(index_dir)s”', {'log_prefix': log_prefix, 'index_dir': index_dir}) os.makedirs(index_dir, 0o700) index_path = os.path.join(index_dir, f'{vendor}-{distribution}.Packages') url = os.path.join(options.mirror, vendor, 'dists', distribution, 'main', 'binary-amd64', 'Packages') logger.debug('%(log_prefix)s download the package index: “%(url)s” => “%(index_path)s”', {'log_prefix': log_prefix, 'url': url, 'index_path': index_path}) download_file(url, index_path) with open(index_path, 'r') as file_h: packages = [pkg for pkg in deb822.Packages.iter_paragraphs(file_h)] return packages def extract_application_services(packages, vendor, vendor_release, distribution, release, options): """Extract the EOLE application services from downloaded binary packages :param list packages: packages of a distribution :param str vendor: vendor name :param str vendor_release: vendor release :param str distribution: distribution name :param str release: EOLE release :param Namespace options: options passed to the script """ log_prefix = f'Release “{release}”: Vendor “{vendor}”: Distribution “{distribution}”:' base_url = os.path.join(options.mirror, vendor) services = {} yaml = YAML() yaml.indent(offset=2) yaml.default_flow_style = False base_application_service_dir = os.path.join(options.temp_dir, 'applicationservice', vendor, vendor_release) if not os.access(base_application_service_dir, os.W_OK): logger.debug('%(log_prefix)s create temporary application service base directory “%(base_application_service_dir)s”', {'log_prefix': log_prefix, 'base_application_service_dir': base_application_service_dir}) os.makedirs(base_application_service_dir, 0o700) deb_dir = os.path.join(options.temp_dir, 'debs') if not os.access(deb_dir, os.W_OK): logger.debug('%(log_prefix)s create temporary directory for deb package file “%(deb_dir)s”', {'log_prefix': log_prefix, 'deb_dir': deb_dir}) os.makedirs(deb_dir, 0o700) for package in packages: package_log_prefix = f'{log_prefix} Package “{package["Package"]}”: Version “{package["Version"]}”:' if 'Filename' not in package: logger.warning('%(log_prefix)s skip malformed package', {'log_prefix': package_log_prefix}) continue elif package['Package'] in EXCLUDED_PACKAGES: logger.debug('%(log_prefix)s skip excluded package', {'log_prefix': package_log_prefix}) continue else: logger.debug('%(log_prefix)s process package', {'log_prefix': package_log_prefix}) url = os.path.join(base_url, package['Filename']) package_path = os.path.join(deb_dir, os.path.basename(url)) try: if os.path.isfile(package_path): with open(package_path, 'rb') as package_fh: sha256 = hashlib.sha256(package_fh.read()).hexdigest() if package['SHA256'].casefold() != sha256.casefold(): logger.debug('%(log_prefix)s delete corrupted package file %(package_path)s', {'log_prefix': log_prefix, 'package_path': package_path}) os.unlink(package_path) else: logger.debug('%(log_prefix)s reuse already downloaded package file %(package_path)s', {'log_prefix': log_prefix, 'package_path': package_path}) if not os.path.isfile(package_path): logger.debug('%(log_prefix)s download package from %(url)s to %(package_path)s', {'log_prefix': package_log_prefix, 'url': url, 'package_path': package_path}) download_file(url, package_path) deb = debfile.DebFile(package_path) if not is_application_service_package(deb): logger.debug('%(log_prefix)s skip package without any application service files', {'log_prefix': package_log_prefix}) os.unlink(package_path) continue logger.info('%(log_prefix)s generate application service', {'log_prefix': package_log_prefix}) package_application_service_dir = os.path.join(base_application_service_dir, package['Package']) service_filename = os.path.join(package_application_service_dir, 'service.yml') if os.path.isdir(package_application_service_dir): logger.debug('%(log_prefix)s delete already existent directory %(package_application_service_dir)s', {'log_prefix': package_log_prefix, 'package_application_service_dir': package_application_service_dir}) shutil.rmtree(package_application_service_dir) # Take care to dump Description as literal string using YAML |- # Remove any line heading spaces on multiline description service = {'format': '0.1', 'name': package['Package'], 'version': PreservedScalarString(package['Version']), 'description': PreservedScalarString(package['Description'].replace('\n ', '\n')), 'depends': [], 'packages': [], 'dictionaries': [], 'extra_dictionaries': {}, 'templates': [], 'creole_funcs': [], 'preservices': [], 'postservices': [], 'pretemplates': [], 'posttemplates': [], 'files': {}, 'excluding_regexp': {}, } service['depends'].extend(parse_deps(package, vendor, distribution, release)) service = extract_files(service, deb, package_application_service_dir, vendor, distribution, release, options) with open(service_filename, 'w') as yaml_fh: yaml.dump(service, yaml_fh) # if os.access(package_path, os.F_OK): # os.unlink(package_path) except Exception as error: logger.error('%(log_prefix)s %(error)s', {'log_prefix': package_log_prefix, 'error': error}) services[service['name']] = service return services def parse_deps(package, vendor, distribution, release): """Parse package dependencies string :param `debian.deb822.Package` package: package informations :param str vendor: vendor name :param str distribution: distribution name :param str release: EOLE release """ log_prefix = f'Release “{release}”: Vendor “{vendor}”: Distribution “{distribution}”: Package “{package["Package"]}”: Version “{package["Version"]}”:' logger.debug('%(log_prefix)s parse dependencies “%(dependencies)s”', {'log_prefix': log_prefix, 'dependencies': package.get("Depends", "")}) depends = [] for dependency in package.relations["depends"]: logger.debug('%(log_prefix)s parse dependency “%(dependency)s”', {'log_prefix': log_prefix, 'dependency': dependency}) if len(dependency) > 1: logger.debug('%(log_prefix)s found alternative dependencies “%(dependencies)s”', {'log_prefix': log_prefix, 'dependencies': dependency}) deps = [] for dep in dependency: logger.debug('%(log_prefix)s add alternative dependency “%(dependency)s”', {'log_prefix': log_prefix, 'dependency': dep['name']}) deps.append(dep['name']) dep = {'or': deps} else: dep = dependency[0]['name'] logger.debug('%(log_prefix)s add dependency “%(dependency)s”', {'log_prefix': log_prefix, 'dependency': dep}) depends.append(dep) return depends def filter_deps(dependencies, service, vendor_services, vendor, vendor_release, release, options): """Filter service dependencies between services and packages :param str dependencies: package dependencies in Debian control format :param dict service: service informations :param dict vendor_services: services per vendor and vendor_release :param str vendor: vendor name :param str vendor_release: vendor release :param str release: EOLE release :param Namespace options: options passed to the script """ log_prefix = f'Release “{release}”: Vendor “{vendor}”: Vendor release “{vendor_release}”: Service “{service["name"]}”:' logger.debug('%(log_prefix)s filter dependencies “%(dependencies)s”', {'log_prefix': log_prefix, 'dependencies': dependencies}) filtered = {'services': [], 'packages': []} for dependency in dependencies: logger.debug('%(log_prefix)s filter dependency “%(dependency)s”', {'log_prefix': log_prefix, 'dependency': dependency}) if isinstance(dependency, dict) and 'or' in dependency: sub_filtered = filter_deps(dependency['or'], service, vendor_services, vendor, vendor_release, release, options) if sub_filtered['services'] and sub_filtered['packages']: logger.error('%(log_prefix)s package should not depends on a service “or” a package', {'log_prefix': log_prefix}) # Above error must be handled manually for dep_type in ['services', 'packages']: logger.debug('%(log_prefix)s Dependency “%(dependency)s”: register “or” %(dependency_type)s dependencies', {'log_prefix': log_prefix, 'dependency': sub_filtered[dep_type], 'dependency_type': dep_type}) filtered[dep_type].append({'or': sub_filtered[dep_type]}) else: if dependency not in EXCLUDED_PACKAGES and is_service(dependency, vendor_services): logger.debug('%(log_prefix)s Dependency “%(dependency)s”: register service dependency', {'log_prefix': log_prefix, 'dependency': dependency}) filtered['services'].append(dependency) else: logger.debug('%(log_prefix)s Dependency “%(dependency)s”: register package dependency', {'log_prefix': log_prefix, 'dependency': dependency}) filtered['packages'].append(dependency) return filtered def gen_packages_xml_schema(service, vendor, vendor_release, release): """Generate the EOLE XML schema for packages :param dict service: service informations :param str vendor: vendor name :param str vendor_release: vendor release :param str release: EOLE release """ log_prefix = f'Release “{release}”: Vendor “{vendor}”: Vendor release “{vendor_release}”: Service “{service["name"]}”:' logger.debug('%(log_prefix)s generate EOLE XML schema for packages', {'log_prefix': log_prefix}) xml_schema_start = """<?xml version="1.0" encoding="utf-8"?> <creole> <files> """ xml_schema_end = """ </files> <variables /> <constraints /> <help /> </creole> """ for package in service['packages']: if isinstance(package, dict) and 'or' in package: # Do not modify reference packages = package['or'].copy() first_package = packages.pop() logger.debug('%(log_prefix)s keep only first package in packages EOLE XML schema “%(package)s”', {'log_prefix': log_prefix, 'package': first_package}) xml_schema_start += f' <package>{first_package}</package>\n' for other_package in packages: logger.debug('%(log_prefix)s add commented XML package “%(package)s”', {'log_prefix': log_prefix, 'package': other_package}) xml_schema_start += f' <!-- <package>{other_package}</package> -->\n' else: logger.debug('%(log_prefix)s add XML package “%(package)s”', {'log_prefix': log_prefix, 'package': package}) xml_schema_start += f' <package>{package}</package>\n' return xml_schema_start + xml_schema_end def extract_files(service, package, directory, vendor, distribution, release, options): """Extract all files and register them in the YAML service structure :param dict service: package description :param `debian.debfile.DebFile` package: python object representing the package :param str directory: directory where to extract the package files :param str vendor: vendor name :param str vendor_release: vendor release :param str distribution: distribution name :param str release: EOLE release """ package_name = package.control.debcontrol()["Package"] package_version = package.control.debcontrol()["Version"] log_prefix = f'Release “{release}”: Vendor “{vendor}”: Distribution “{distribution}”: Package “{package_name}”: Version “{package_version}”:' logger.debug('%(log_prefix)s extract and register application service files', {'log_prefix': log_prefix}) if not os.access(directory, os.W_OK): logger.debug('%(log_prefix)s create temporary extract directory “%(directory)s”', {'log_prefix': log_prefix, 'directory': directory}) extract_dir = os.path.join(options.temp_dir, 'extract', package_name) if os.path.isdir(extract_dir): logger.debug('%(log_prefix)s delete existing directory “%(extract_dir)s”', {'log_prefix': log_prefix, 'extract_dir': extract_dir}) shutil.rmtree(extract_dir) logger.debug('%(log_prefix)s extract data archive to “%(extract_dir)s”', {'log_prefix': log_prefix, 'extract_dir': extract_dir}) package.data.tgz().extractall(path=extract_dir) for filename in list(package.data): source = os.path.abspath(os.path.join(extract_dir, filename)) basename = os.path.basename(filename) excluded_match = is_file_excluded(filename) if os.path.isdir(source): logger.debug('%(log_prefix)s skip directory “%(filename)s”', {'log_prefix': log_prefix, 'filename': filename}) continue elif excluded_match: logger.debug('%(log_prefix)s skip excluded file “%(filename)s”', {'log_prefix': log_prefix, 'filename': filename}) exclude = excluded_match.group(1) if exclude not in service['excluding_regexp']: service['excluding_regexp'][exclude] = 0 service['excluding_regexp'][exclude] += 1 continue else: logger.debug('%(log_prefix)s process file “%(filename)s”', {'log_prefix': log_prefix, 'filename': filename}) file_type_match = match_any(filename, FILE_RULES) if file_type_match: file_type = FILE_MAP[file_type_match.group(1)] else: file_type = 'files' logger.debug('%(log_prefix)s register file in “%(file_type)s”: “%(filename)s”', {'log_prefix': log_prefix, 'file_type': file_type, 'filename': filename}) type_dir = os.path.join(directory, file_type) if not os.access(type_dir, os.W_OK): logger.debug('%(log_prefix)s create temporary “%(file_type)s” directory “%(type_dir)s”', {'log_prefix': log_prefix, 'file_type': file_type, 'type_dir': type_dir}) os.makedirs(type_dir, 0o700) if file_type == 'extra_dictionaries': # Extra dictionnaries are kept under component sub directory (schedule, …) component = os.path.basename(os.path.dirname(filename)) dest = os.path.abspath(os.path.join(type_dir, component, basename)) if component not in service[file_type]: service[file_type][component] = [] service[file_type][component].append(basename) elif file_type == 'files': if os.access(source, os.X_OK): mode = '0755' else: mode = '0644' normed_path = os.path.normpath(os.path.join('/', filename)) service[file_type][normed_path] = {'owner': 'root', 'group': 'root', 'mode': mode} dest = os.path.abspath(os.path.join(type_dir, filename)) else: dest = os.path.abspath(os.path.join(type_dir, basename)) service[file_type].append(basename) dest_dir = os.path.dirname(dest) if not os.access(dest_dir, os.W_OK): logger.debug('%(log_prefix)s create destination directory “%(dest_dir)s”', {'log_prefix': log_prefix, 'dest_dir': dest_dir}) os.makedirs(dest_dir, 0o700) logger.debug('%(log_prefix)s copy file “%(filename)s” to “%(dest)s”', {'log_prefix': log_prefix, 'filename': filename, 'dest': dest}) # Do not preserve execution permission on copy # Recreate symbolic link instead of copying the source target shutil.copyfile(source, dest, follow_symlinks=False) logger.debug('%(log_prefix)s delete application service extraction directory “%(extract_dir)s”', {'log_prefix': log_prefix, 'extract_dir': extract_dir}) shutil.rmtree(extract_dir) return service def is_service(name, vendor_services): """Verify if `name` is declared as a service in any vendor :param str name: name of a package :param dict vendor_services: services per vendor and vendor_release """ for _, vendor_infos in vendor_services.items(): for _, service_infos in vendor_infos.items(): if name in service_infos: return True return False def is_application_service_package(package): """Verify if the package has any application service files :param `debian.debfile.DebFile` package: python object representing the package """ for filename in list(package.data): if is_application_service_schema(filename): return True return False def is_application_service_schema(filename): """Match on EOLE application service XML schema :param str filename: name of a file """ return match_any(filename, SCHEMA_RULES) def is_application_service_file(filename): """Match on EOLE application service filename :param str filename: name of a file """ return match_any(filename, FILE_RULES) def is_file_excluded(filename): """Match on excluded file regex :param str filename: name of a file """ return match_any(filename, EXCLUDED_FILES_RULES) def download_file(url, filename): """Download a file with HTTP :param str url: URL of the file to download :param str filename: path where to store the content """ response = requests.get(url, stream=True) if not response.ok: raise Exception(f'Can not download {url}: {response.reason}') with open(filename, 'wb') as file_h: shutil.copyfileobj(response.raw, file_h) def match_any(string, regexes): """Match a string against a list of regex :param str string: string to match :param list regexes: list of regular expression :return: the `Match` object or `None` """ for regex in regexes: match = regex.match(string) if match: return match return None if __name__ == '__main__': main()