<html><head><meta name="color-scheme" content="light dark"></head><body><pre style="word-wrap: break-word; white-space: pre-wrap;">#!/usr/bin/python3
# -*- coding: utf-8 -*-
# Module containing classes, variables, etc. for creating OVAL content
#
# Author: David Ries &lt;ries@jovalcm.com&gt;
# Author: Joy Latten &lt;joy.latten@canonical.com&gt;
# Copyright (C) 2015 Farnam Hall Ventures LLC
# Copyright (C) 2019 Canonical Ltd.
#
# This script is distributed under the terms and conditions of the GNU General
# Public License, Version 2 or later. See http://www.gnu.org/copyleft/gpl.html
# for details.
#
# NOTES / TODOs
# This script creates OVAL ids based on the related CVE ID but does not
# currently increment the version number of generated elements when they
# change.

from __future__ import unicode_literals

from datetime import datetime, timezone
import io
import os
import random
import re
import shutil
import sys
import tempfile
import collections

from cve_lib import load_cve, get_subproject_description

from xml.sax.saxutils import escape

def recursive_rm(dirPath):
    '''recursively remove directory'''
    names = os.listdir(dirPath)
    for name in names:
        path = os.path.join(dirPath, name)
        if not os.path.isdir(path):
            os.unlink(path)
        else:
            recursive_rm(path)
    os.rmdir(dirPath)

def _open(fn, mode, encoding='utf-8'):
    """ open file """
    fd = None
    if sys.version_info[0] &lt; 3:
        fd = io.open(fn, mode=mode, encoding=encoding)
    else:
        fd = open(fn, mode=mode, encoding=encoding)
    return fd

class OvalGenerator:
    supported_oval_elements = ('definition', 'test', 'object', 'state',
                               'variable')
    generator_version = '1.1'
    oval_schema_version = '5.11.1'

    def __init__(self, release, release_name, parent, warn_method=False, outdir='./', prefix='', oval_format='dpkg'):
        """ constructor, set defaults for instances """

        self.release = release
        # e.g. codename for trusty/esm should be trusty
        self.release_codename = parent if parent else self.release.replace('/', '_')
        self.release_name = release_name
        self.warn = warn_method or self.warn
        self.tmpdir = tempfile.mkdtemp(prefix='oval_lib-')
        self.output_dir = outdir
        self.oval_format = oval_format
        self.output_filepath = \
            '{0}com.ubuntu.{1}.cve.oval.xml'.format(prefix, self.release.replace('/', '_'))
        self.ns = 'oval:com.ubuntu.{0}'.format(self.release_codename)
        self.id = 10
        self.release_applicability_definition_id = '{0}:def:{1}0'.format(self.ns, self.id)

    def __del__(self):
        """ deconstructor, clean up """
        if os.path.exists(self.tmpdir):
            recursive_rm(self.tmpdir)

    def generate_cve_definition(self, cve):
        """ generate an OVAL definition based on parsed CVE data """

        header = cve['header']
        # if the multiplier is not large enough, the tests IDs will
        # overlap on things with large numbers of binary packages.
        # if we ever have an issue that touches more than 1,000,000
        # binary packages, that will cause a problem.
        id_base = int(re.sub('[^0-9]', '', header['Candidate'])) * 1000000
        if not self.unique_id_base(id_base, header['Source-note']):
            self.warn('Calculated id_base "{0}" based on candidate value "{1}" is not unique. Skipping CVE.'.format(id_base, header['Candidate']))

        # make test(s) for each package
        test_refs = []
        packages = cve['packages']
        for package in sorted(packages.keys()):
            releases = packages[package]['Releases']
            for release in sorted(releases.keys()):
                if release == self.release:
                    release_status = releases[release]
                    if 'bin-pkgs' in release_status and release_status['bin-pkgs']:
                        test_ref = self.get_oval_test_for_package({
                            'name': package,
                            'binaries': release_status['bin-pkgs'],
                            'status': release_status['status'],
                            'note': release_status['note'],
                            'fix-version': release_status['fix-version'] if 'fix-version' in release_status else '',
                            'id_base': id_base + len(test_refs),
                            'source-note': header['Source-note']
                        })
                        if test_ref:
                            test_refs.append(test_ref)

        # if no packages for this release, then we're done
        if not len(test_refs):
            return False

        # convert CVE data to OVAL definition metadata
        mapping = {
            'ns': escape(self.ns),
            'id_base': id_base,
            'codename': escape(self.release_codename),
            'release_name': escape(self.release_name),
            'applicability_def_id': escape(
                self.release_applicability_definition_id),
            'cve_title': escape(header['Candidate']),
            'description': escape('{0} {1}'.format(header['Description'],
                                  header['Ubuntu-Description']).strip()),
            'priority': escape(header['Priority']),
            'criteria': '',
            'references': '',
            'notes': ''
        }

        # convert test_refs to criteria
        if len(test_refs) == 1:
            negation_attribute = 'negate = "true" ' \
                if 'negate' in test_refs[0] and test_refs[0]['negate'] else ''
            mapping['criteria'] = \
                '&lt;criterion test_ref="{0}" comment="{1}" {2}/&gt;'.format(
                    test_refs[0]['id'], escape(test_refs[0]['comment']), negation_attribute)
        else:
            criteria = []
            criteria.append('&lt;criteria operator="OR"&gt;')
            for test_ref in test_refs:
                negation_attribute = 'negate = "true" ' \
                    if 'negate' in test_ref and test_ref['negate'] else ''
                criteria.append(
                    '    ' +
                    '&lt;criterion test_ref="{0}" comment="{1}" {2}/&gt;'.format(
                        test_ref['id'],
                        escape(test_ref['comment']), negation_attribute))
            criteria.append('&lt;/criteria&gt;')
            mapping['criteria'] = '\n                    '.join(criteria)

        # convert notes
        if header['Notes']:
            mapping['notes'] = '\n                &lt;oval:notes&gt;' + \
                               '\n                    &lt;oval:note&gt;{0}&lt;/oval:note&gt;'.format(escape(header['Notes'])) + \
                               '\n                &lt;/oval:notes&gt;'

        # convert additional data &lt;advisory&gt; metadata elements
        advisory = []
        advisory.append('&lt;severity&gt;{0}&lt;/severity&gt;'.format(
            escape(header['Priority'].title())))
        advisory.append(
            '&lt;rights&gt;Copyright (C) {0}Canonical Ltd.&lt;/rights&gt;'.format(escape(
                header['PublicDate'].split('-', 1)[0] + ' '
                if header['PublicDate'] else '')))
        if header['PublicDate']:
            advisory.append('&lt;public_date&gt;{0}&lt;/public_date&gt;'.format(
                escape(header['PublicDate'])))
        if header['PublicDateAtUSN']:
            advisory.append(
                '&lt;public_date_at_usn&gt;{0}&lt;/public_date_at_usn&gt;'.format(escape(
                    header['PublicDateAtUSN'])))
        if header['Assigned-to']:
            advisory.append('&lt;assigned_to&gt;{0}&lt;/assigned_to&gt;'.format(escape(
                header['Assigned-to'])))
        if header['Discovered-by']:
            advisory.append('&lt;discovered_by&gt;{0}&lt;/discovered_by&gt;'.format(escape(
                header['Discovered-by'])))
        if header['CRD']:
            advisory.append('&lt;crd&gt;{0}&lt;/crd&gt;'.format(escape(header['CRD'])))
        for bug in header['Bugs']:
            advisory.append('&lt;bug&gt;{0}&lt;/bug&gt;'.format(escape(bug)))
        for ref in header['References']:
            if ref.startswith('https://cve.mitre'):
                cve_title = ref.split('=')[-1].strip()
                if not cve_title:
                    continue
                mapping['cve_title'] = escape(cve_title)
                mapping['references'] = '\n                    &lt;reference source="CVE" ref_id="{0}" ref_url="{1}" /&gt;'.format(mapping['cve_title'], escape(ref))
            else:
                advisory.append('&lt;ref&gt;{0}&lt;/ref&gt;'.format(escape(ref)))
        mapping['advisory_elements'] = '\n                        '.join(advisory)

        if self.oval_format == 'dpkg':
            mapping['os_release_check'] = """&lt;extend_definition definition_ref="{applicability_def_id}" comment="{release_name} ({codename}) is installed." applicability_check="true" /&gt;""".format(**mapping)
        else:
            mapping['os_release_check'] = ''

        self.queue_element('definition', """
            &lt;definition class="vulnerability" id="{ns}:def:{id_base}0" version="1"&gt;
                &lt;metadata&gt;
                    &lt;title&gt;{cve_title} on {release_name} ({codename}) - {priority}.&lt;/title&gt;
                    &lt;description&gt;{description}&lt;/description&gt;
                    &lt;affected family="unix"&gt;
                        &lt;platform&gt;{release_name}&lt;/platform&gt;
                    &lt;/affected&gt;{references}
                    &lt;advisory&gt;
                        {advisory_elements}
                    &lt;/advisory&gt;
                &lt;/metadata&gt;{notes}
                &lt;criteria&gt;
                    {os_release_check}
                    {criteria}
                &lt;/criteria&gt;
            &lt;/definition&gt;\n""".format(**mapping))

    def get_oval_test_for_package(self, package):
        """ create OVAL test and dependent objects for this package status
                @package = {
                    'name'          : '&lt;package name&gt;',
                    'binaries'      : [ '&lt;binary_pkg_name', '&lt;binary_pkg_name', ... ],
                    'status'        : '&lt;not-applicable | unknown | vulnerable | fixed&gt;',
                    'note'          : '&lt;a description of the status&gt;',
                    'fix-version'   : '&lt;the version in which the issue was fixed, if applicable&gt;',
                    'id_base'       : a base for the integer section of the OVAL id,
                    'source-note'   : a note about the datasource for debugging
                }
        """

        if package['status'] == 'fixed' and not package['fix-version']:
            self.warn('"{0}" package in {1} is marked fixed, but missing a fix-version. Changing status to vulnerable.'.format(package['name'], package['source-note']))
            package['status'] = 'vulnerable'

        if package['status'] == 'not-applicable':
            # if the package status is not-applicable, skip it!
            return False
        elif package['status'] == 'not-vulnerable':
            # if the packaget status is not-vulnerable, skip it!
            return False
            """
            object_id = self.get_package_object_id(package['name'], package['id_base'], 1)

            test_title = "Returns true whether or not the '{0}' package exists.".format(package['name'])
            test_id = self.get_package_test_id(package['name'], package['id_base'], test_title, object_id, None, 1, 'any_exist')

            package['note'] = package['name'] + package['note']
            return {'id': test_id, 'comment': package['note'], 'negate': True}
            """
        elif package['status'] == 'vulnerable':
            object_id = self.get_package_object_id(package['name'], package['binaries'], package['id_base'])

            test_title = "Does the '{0}' package exist?".format(package['name'])
            test_id = self.get_package_test_id(package['name'], package['id_base'], test_title, object_id)

            package['note'] = package['name'] + package['note']
            return {'id': test_id, 'comment': package['note']}
        elif package['status'] == 'fixed':
            object_id = self.get_package_object_id(package['name'], package['binaries'], package['id_base'])

            state_id = self.get_package_version_state_id(package['id_base'], package['fix-version'])

            test_title = "Does the '{0}' package exist and is the version less than '{1}'?".format(package['name'], package['fix-version'])
            test_id = self.get_package_test_id(package['name'], package['id_base'], test_title, object_id, state_id)

            package['note'] = package['name'] + package['note']
            return {'id': test_id, 'comment': package['note']}
        else:
            if package['status'] != 'unknown':
                self.warn('"{0}" is not a supported package status. Outputting for "unknown" status.'.format(package['status']))

            if not hasattr(self, 'id_unknown_test'):
                self.id_unknown_test = '{0}:tst:10'.format(self.ns)
                self.queue_element('test', """
                    &lt;ind-def:unknown_test id="{0}" check="all" comment="The result of this test is always UNKNOWN." version="1" /&gt;\n""".format(self.id_unknown_test))

            package['note'] = package['name'] + package['note']
            return {'id': self.id_unknown_test, 'comment': package['note']}

    def add_release_applicability_definition(self):
        """ add platform/release applicability OVAL definition for codename """

        mapping = {
            'ns': self.ns,
            'id_base': self.id,
            'codename': self.release_codename,
            'release_name': self.release_name,
        }
        self.release_applicability_definition_id = \
            '{ns}:def:{id_base}0'.format(**mapping)

        if self.oval_format == 'dpkg':
            self.queue_element('definition', """
                &lt;definition class="inventory" id="{ns}:def:{id_base}0" version="1"&gt;
                    &lt;metadata&gt;
                        &lt;title&gt;Check that {release_name} ({codename}) is installed.&lt;/title&gt;
                        &lt;description&gt;&lt;/description&gt;
                    &lt;/metadata&gt;
                    &lt;criteria&gt;
                        &lt;criterion test_ref="{ns}:tst:{id_base}0" comment="The host is part of the unix family." /&gt;
                        &lt;criterion test_ref="{ns}:tst:{id_base}1" comment="The host is running Ubuntu {codename}." /&gt;
                    &lt;/criteria&gt;
                &lt;/definition&gt;\n""".format(**mapping))

            self.queue_element('test', """
                &lt;ind-def:family_test id="{ns}:tst:{id_base}0" check="at least one" check_existence="at_least_one_exists" version="1" comment="Is the host part of the unix family?"&gt;
                    &lt;ind-def:object object_ref="{ns}:obj:{id_base}0"/&gt;
                    &lt;ind-def:state state_ref="{ns}:ste:{id_base}0"/&gt;
                &lt;/ind-def:family_test&gt;

                &lt;ind-def:textfilecontent54_test id="{ns}:tst:{id_base}1" check="at least one" check_existence="at_least_one_exists" version="1" comment="Is the host running Ubuntu {codename}?"&gt;
                    &lt;ind-def:object object_ref="{ns}:obj:{id_base}1"/&gt;
                    &lt;ind-def:state state_ref="{ns}:ste:{id_base}1"/&gt;
                &lt;/ind-def:textfilecontent54_test&gt;\n""".format(**mapping))

            # /etc/lsb-release has to be a single path, due to some
            # environments (namely snaps) not being allowed to list the
            # content of /etc/
            self.queue_element('object', """
                &lt;ind-def:family_object id="{ns}:obj:{id_base}0" version="1" comment="The singleton family object."/&gt;

                &lt;ind-def:textfilecontent54_object id="{ns}:obj:{id_base}1" version="1" comment="The singleton release codename object."&gt;
                    &lt;ind-def:filepath&gt;/etc/lsb-release&lt;/ind-def:filepath&gt;
                    &lt;ind-def:pattern operation="pattern match"&gt;^[\\s\\S]*DISTRIB_CODENAME=([a-z]+)$&lt;/ind-def:pattern&gt;
                    &lt;ind-def:instance datatype="int"&gt;1&lt;/ind-def:instance&gt;
                &lt;/ind-def:textfilecontent54_object&gt;\n""".format(**mapping))

            self.queue_element('state', """
                &lt;ind-def:family_state id="{ns}:ste:{id_base}0" version="1" comment="The singleton family object."&gt;
                    &lt;ind-def:family&gt;unix&lt;/ind-def:family&gt;
                &lt;/ind-def:family_state&gt;

                &lt;ind-def:textfilecontent54_state id="{ns}:ste:{id_base}1" version="1" comment="{release_name}"&gt;
                    &lt;ind-def:subexpression&gt;{codename}&lt;/ind-def:subexpression&gt;
                &lt;/ind-def:textfilecontent54_state&gt;\n""".format(**mapping))

    def get_package_object_id(self, name, bin_pkgs, id_base, version=1):
        """ create unique object for each package and return its OVAL id """
        if not hasattr(self, 'package_objects'):
            self.package_objects = {}

        key = tuple(sorted(bin_pkgs))

        if key not in self.package_objects:
            object_id = '{0}:obj:{1}0'.format(self.ns, id_base)

            if len(bin_pkgs) &gt; 1:
                # create variable for binary package names
                variable_id = '{0}:var:{1}0'.format(self.ns, id_base)
                if self.oval_format == 'dpkg':
                    variable_values = '&lt;/value&gt;\n                        &lt;value&gt;'.join(bin_pkgs)
                    self.queue_element('variable', """
                        &lt;constant_variable id="{0}" version="{1}" datatype="string" comment="'{2}' package binaries"&gt;
                            &lt;value&gt;{3}&lt;/value&gt;
                        &lt;/constant_variable&gt;\n""".format(variable_id, version, name, variable_values))

                    # create an object that references the variable
                    self.queue_element('object', """
                        &lt;linux-def:dpkginfo_object id="{0}" version="{1}" comment="The '{2}' package binaries."&gt;
                            &lt;linux-def:name var_ref="{3}" var_check="at least one" /&gt;
                        &lt;/linux-def:dpkginfo_object&gt;\n""".format(object_id, version, name, variable_id))

                else:
                    variable_values = '\s+(.*)&lt;/value&gt;\n                        &lt;value&gt;^'.join(bin_pkgs)
                    self.queue_element('variable', """
                        &lt;constant_variable id="{0}" version="{1}" datatype="string" comment="'{2}' package binaries"&gt;
                            &lt;value&gt;^{3}\s+(.*)&lt;/value&gt;
                        &lt;/constant_variable&gt;\n""".format(variable_id, version, name, variable_values))

                    # create an object that references the variable
                    self.queue_element('object', """
                        &lt;ind-def:textfilecontent54_object id="{0}" version="{1}" comment="The '{2}' package binaries."&gt;
                            &lt;ind-def:path&gt;.&lt;/ind-def:path&gt;
                            &lt;ind-def:filename&gt;manifest&lt;/ind-def:filename&gt;
                            &lt;ind-def:pattern operation="pattern match" datatype="string" var_ref="{3}" var_check="at least one" /&gt;
                            &lt;ind-def:instance operation="greater than or equal" datatype="int"&gt;1&lt;/ind-def:instance&gt;
                        &lt;/ind-def:textfilecontent54_object&gt;\n""".format(object_id, version, name, variable_id))

            else:
                if self.oval_format == 'dpkg':
                    # 1 binary package, so just use name in object (no variable)
                    self.queue_element('object', """
                        &lt;linux-def:dpkginfo_object id="{0}" version="{1}" comment="The '{2}' package binary."&gt;
                            &lt;linux-def:name&gt;{3}&lt;/linux-def:name&gt;
                        &lt;/linux-def:dpkginfo_object&gt;\n""".format(object_id, version, name, bin_pkgs[0]))
                else:
                    variable_id = '{0}:var:{1}0'.format(self.ns, id_base)
                    variable_values = '\s+(.*)&lt;/value&gt;\n                        &lt;value&gt;^'.join(bin_pkgs)
                    self.queue_element('variable', """
                        &lt;constant_variable id="{0}" version="{1}" datatype="string" comment="'{2}' package binaries"&gt;
                            &lt;value&gt;^{3}\s+(.*)&lt;/value&gt;
                        &lt;/constant_variable&gt;\n""".format(variable_id, version, name, variable_values))
                    self.queue_element('object', """
                        &lt;ind-def:textfilecontent54_object id="{0}" version="{1}" comment="The '{2}' package binary."&gt;
                            &lt;ind-def:path&gt;.&lt;/ind-def:path&gt;
                            &lt;ind-def:filename&gt;manifest&lt;/ind-def:filename&gt;
                            &lt;ind-def:pattern operation="pattern match" datatype="string" var_ref="{3}" var_check="at least one" /&gt;
                            &lt;ind-def:instance operation="greater than or equal" datatype="int"&gt;1&lt;/ind-def:instance&gt;
                        &lt;/ind-def:textfilecontent54_object&gt;\n""".format(object_id, version, name, variable_id))

            self.package_objects[key] = object_id

        return self.package_objects[key]

    def get_package_version_state_id(self, id_base, fix_version, version=1):
        """ create unique states for each version and return its OVAL id """
        if not hasattr(self, 'package_version_states'):
            self.package_version_states = {}

        key = fix_version
        if key not in self.package_version_states:
            state_id = '{0}:ste:{1}0'.format(self.ns, id_base)
            if self.oval_format == 'dpkg':
                epoch_fix_version = fix_version if fix_version.find(':') != -1 else "0:" + fix_version
                self.queue_element('state', """
                    &lt;linux-def:dpkginfo_state id="{0}" version="{1}" comment="The package version is less than '{2}'."&gt;
                        &lt;linux-def:evr datatype="debian_evr_string" operation="less than"&gt;{2}&lt;/linux-def:evr&gt;
                    &lt;/linux-def:dpkginfo_state&gt;\n""".format(state_id, version, epoch_fix_version))
            else:
                self.queue_element('state', """
                    &lt;ind-def:textfilecontent54_state id="{0}" version="{1}" comment="The package version is less than '{2}'."&gt;
                        &lt;ind-def:subexpression datatype="debian_evr_string" operation="less than"&gt;{2}&lt;/ind-def:subexpression&gt;
                    &lt;/ind-def:textfilecontent54_state&gt;\n""".format(state_id, version, fix_version))
            self.package_version_states[key] = state_id

        return self.package_version_states[key]

    def get_package_test_id(self, name, id_base, test_title, object_id, state_id=None, version=1, check_existence='at_least_one_exists'):
        """ create unique test for each parameter set and return its OVAL id """
        if not hasattr(self, 'package_tests'):
            self.package_tests = {}

        key = (name, test_title, object_id, state_id)
        if key not in self.package_tests:
            test_id = '{0}:tst:{1}0'.format(self.ns, id_base)
            if self.oval_format == 'dpkg':
                state_ref = '\n                    &lt;linux-def:state state_ref="{0}" /&gt;'.format(state_id) if state_id else ''
                self.queue_element('test', """
                    &lt;linux-def:dpkginfo_test id="{0}" version="{1}" check_existence="{5}" check="at least one" comment="{2}"&gt;
                        &lt;linux-def:object object_ref="{3}"/&gt;{4}
                    &lt;/linux-def:dpkginfo_test&gt;\n""".format(test_id, version, test_title, object_id, state_ref, check_existence))
            else:
                state_ref = '\n                    &lt;ind-def:state state_ref="{0}" /&gt;'.format(state_id) if state_id else ''
                self.queue_element('test', """
                    &lt;ind-def:textfilecontent54_test id="{0}" version="{1}" check_existence="{5}" check="at least one" comment="{2}"&gt;
                        &lt;ind-def:object object_ref="{3}"/&gt;{4}
                    &lt;/ind-def:textfilecontent54_test&gt;\n""".format(test_id, version, test_title, object_id, state_ref, check_existence))
            self.package_tests[key] = test_id

        return self.package_tests[key]

    def queue_element(self, element, xml):
        """ add an OVAL element to an output queue file """
        if element not in OvalGenerator.supported_oval_elements:
            self.warn('"{0}" is not a supported OVAL element.'.format(element))
            return

        if not hasattr(self, 'tmp'):
            self.tmp = {}
            self.tmp_n = random.randrange(1000000, 9999999)

        if element not in self.tmp:
            self.tmp[element] = _open(os.path.join(self.tmpdir,
                                           './queue.{0}.{1}.xml'.format(
                                               self.tmp_n, element)), 'wt')

        # trim and fix indenting (assumes fragment is nicely indented internally)
        xml = xml.strip('\n')
        base_indent = re.match(r'\s*', xml).group(0)
        xml = re.sub('^{0}'.format(base_indent), '        ', xml, 0,
                     re.MULTILINE)

        self.tmp[element].write(xml + '\n')

    def write_to_file(self):
        """ dequeue all elements into one OVAL definitions file and clean up """
        if not hasattr(self, 'tmp'):
            return

        # close queue files for writing and then open for reading
        for key in self.tmp:
            self.tmp[key].close()
            self.tmp[key] = _open(self.tmp[key].name, 'rt')

        tmp = os.path.join(self.tmpdir, self.output_filepath)
        with _open(tmp, 'wt') as f:
            # add header
            oval_timestamp = datetime.now(tz=timezone.utc).strftime(
                '%Y-%m-%dT%H:%M:%S')
            f.write("""&lt;oval_definitions
    xmlns="http://oval.mitre.org/XMLSchema/oval-definitions-5"
    xmlns:ind-def="http://oval.mitre.org/XMLSchema/oval-definitions-5#independent"
    xmlns:oval="http://oval.mitre.org/XMLSchema/oval-common-5"
    xmlns:unix-def="http://oval.mitre.org/XMLSchema/oval-definitions-5#unix"
    xmlns:linux-def="http://oval.mitre.org/XMLSchema/oval-definitions-5#linux"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://oval.mitre.org/XMLSchema/oval-common-5 oval-common-schema.xsd   http://oval.mitre.org/XMLSchema/oval-definitions-5 oval-definitions-schema.xsd   http://oval.mitre.org/XMLSchema/oval-definitions-5#independent independent-definitions-schema.xsd   http://oval.mitre.org/XMLSchema/oval-definitions-5#unix unix-definitions-schema.xsd   http://oval.mitre.org/XMLSchema/oval-definitions-5#macos linux-definitions-schema.xsd"&gt;

    &lt;generator&gt;
        &lt;oval:product_name&gt;Canonical CVE OVAL Generator&lt;/oval:product_name&gt;
        &lt;oval:product_version&gt;{0}&lt;/oval:product_version&gt;
        &lt;oval:schema_version&gt;{1}&lt;/oval:schema_version&gt;
        &lt;oval:timestamp&gt;{2}&lt;/oval:timestamp&gt;
    &lt;/generator&gt;\n""".format(OvalGenerator.generator_version, OvalGenerator.oval_schema_version, oval_timestamp))

            # add queued file content
            for element in OvalGenerator.supported_oval_elements:
                if element in self.tmp:
                    f.write("\n    &lt;{0}s&gt;\n".format(element))
                    f.write(self.tmp[element].read().rstrip())
                    f.write("\n    &lt;/{0}s&gt;".format(element))

            # add footer
            f.write("\n&lt;/oval_definitions&gt;")

        # close and delete queue files
        for key in self.tmp:
            self.tmp[key].close()
            os.remove(self.tmp[key].name)

        # close self.output_filepath and move into place
        f.close()
        shutil.move(tmp, os.path.join(self.output_dir, self.output_filepath))

        # remove tmp dir if empty
        if not os.listdir(self.tmpdir):
            os.rmdir(self.tmpdir)

    def unique_id_base(self, id_base, note):
        """ queue a warning message """
        if not hasattr(self, 'id_bases'):
            self.id_bases = {}
        is_unique = id_base not in self.id_bases.keys()
        if not is_unique:
            self.warn('ID Base collision {0} in {1} and {2}.'.format(
                id_base, note, self.id_bases[id_base]))
        self.id_bases[id_base] = note
        return is_unique

    def warn(self, message):
        """ print a warning message """
        print('WARNING: {0}'.format(message))


class OvalGeneratorUSN():
    supported_oval_elements = ('definition', 'test', 'object', 'state',
                               'variable')
    cve_base_url = 'https://ubuntu.com/security/{}'
    mitre_base_url = 'https://cve.mitre.org/cgi-bin/cvename.cgi?name={}'
    usn_base_url = 'https://ubuntu.com/security/notices/USN-{}'
    lookup_cve_path = ['./active', './retired']
    generator_version = '1'
    oval_schema_version = '5.11.1'
    priorities = {'negligible': 0, 'low': 1, 'medium': 2, 'high': 3, 'critical': 4}

    def __init__(self, release_codename, release_name, outdir='./', cve_dir=None, prefix='', oval_format='dpkg'):
        self.release_codename = release_codename.replace('/', '_')
        self.release_name = release_name
        self.current_oval = None
        self.tmpdir = tempfile.mkdtemp(prefix='oval_lib-')
        self.output_dir = outdir
        self.prefix = prefix
        self.oval_format = oval_format
        self.output_filepath = \
            '{0}com.ubuntu.{1}.usn.oval.xml'.format(prefix, self.release_codename)
        self.ns = 'oval:com.ubuntu.{0}'.format(self.release_codename)
        self.id = 100
        self.oval_structure = None
        self.load_oval_file_structures()
        self.create_release_oval_info()

    def load_oval_file_structures(self):
        _file = '{}.oval.usn'.format(self.release_codename)
        mode = 'w'

        self.oval_structure = {
            key: _open(os.path.join(self.tmpdir, '{}.{}.xml'.format(_file, key)),
                                   mode=mode, encoding='utf-8')  for key in
                    ['definition', 'test', 'object', 'state', 'variable']
        }

    # loads the release info either from oval_db or creating it
    def load_oval_release_struct(self):
        elements = {
            'definition': self.create_release_definition(),
            'test': self.create_release_test(),
            'object': self.create_release_object(),
            'state': self.create_release_state()
            }
        return elements

    # creates from scratch or just load from the oval_db
    def create_release_oval_info(self):
        oval_rel_struct = self.load_oval_release_struct()

        self.oval_structure['definition'].write(oval_rel_struct['definition'])
        self.oval_structure['test'].write(oval_rel_struct['test'])
        self.oval_structure['object'].write(oval_rel_struct['object'])
        self.oval_structure['state'].write(oval_rel_struct['state'])

    def create_release_definition(self):
        if self.oval_format == 'dpkg':
            mapping = {
                'id': self.id,
                'ns': self.ns,
                'title': "Check that {} ({})".format(self.release_name, self.release_codename),
                'comment': "{} ({})".format(self.release_name, self.release_codename)
            }

            definition =\
        """
        &lt;definition class="inventory" id="{ns}:def:{id}" version="1"&gt;
           &lt;metadata&gt;
              &lt;title&gt;{title} is installed.&lt;/title&gt;
              &lt;description&gt;&lt;/description&gt;
           &lt;/metadata&gt;
           &lt;criteria&gt;
              &lt;criterion test_ref="{ns}:tst:{id}" comment="{comment} is installed." /&gt;
           &lt;/criteria&gt;
        &lt;/definition&gt;""".format(**mapping)
        else:
            definition = ""

        return definition

    def create_release_test(self):
        if self.oval_format == 'dpkg':
            mapping = {
                'id': self.id,
                'ns': self.ns,
                'comment': "{} ({})".format(self.release_name, self.release_codename)
            }

            test =\
        """
        &lt;ind:textfilecontent54_test check="at least one" check_existence="at_least_one_exists" id="{ns}:tst:{id}" version="1" comment="{comment} is installed."&gt;
           &lt;ind:object object_ref="{ns}:obj:{id}" /&gt;
           &lt;ind:state state_ref="{ns}:ste:{id}" /&gt;
        &lt;/ind:textfilecontent54_test&gt;""".format(**mapping)
        else:
            test = ""

        return test

    def create_release_object(self):
        if self.oval_format == 'dpkg':
            mapping = {
                'id': self.id,
                'ns': self.ns,
            }

            _object =\
        """
        &lt;ind:textfilecontent54_object id="{ns}:obj:{id}" version="1"&gt;
           &lt;ind:filepath datatype="string"&gt;/etc/lsb-release&lt;/ind:filepath&gt;
             &lt;ind:pattern operation="pattern match"&gt;^[\s\S]*DISTRIB_CODENAME=([a-z]+)$&lt;/ind:pattern&gt;
           &lt;ind:instance datatype="int"&gt;1&lt;/ind:instance&gt;
        &lt;/ind:textfilecontent54_object&gt;""".format(**mapping)
        else:
            _object = ""

        return _object

    def create_release_state(self):
        if self.oval_format == 'dpkg':
            mapping = {
                'id': self.id,
                'ns': self.ns,
                'comment': "{}".format(self.release_name),
                'release_codename': self.release_codename,
            }

            state =\
        """
        &lt;ind:textfilecontent54_state id="{ns}:ste:{id}" version="1" comment="{comment}"&gt;
           &lt;ind:subexpression datatype="string" operation="equals"&gt;{release_codename}&lt;/ind:subexpression&gt;
        &lt;/ind:textfilecontent54_state&gt;""".format(**mapping)
        else:
            state = ""

        return state

    def create_bug_references(self, urls):
        bug_urls = []
        alien_urls = []
        bugs = ""

        for url in urls:
            is_bug = re.match("https?:\/\/(bugs\.)?launchpad\.net\/(.*\/\+bug|bugs)\/\d+", url)

            if is_bug:
                bug_urls.append(url)
            else:
                alien_urls.append(url)

        for bug in bug_urls:
            bugs += \
                """
                &lt;bug&gt;{}&lt;/bug&gt;
                """.format(bug)

        for alien in alien_urls:
            bugs += \
                """&lt;ref&gt;{}&lt;/ref&gt;
                """.format(alien)

        return bugs.strip()

    def create_cves_references(self, cves):
        references = ""
        for cve in cves:
            references += \
                    """              &lt;reference source="CVE" ref_url="{}" ref_id="{}"/&gt;
""".format(cve['CVE_URL'], cve['Candidate'])

        return references.strip()

    def get_usn_severity(self, cves):
        if not cves:
            return "None"

        max_severity = max(cves)
        if max_severity == 1 and cves.count(1) &gt;= 5:
            return 'Medium'

        usn_severity = [key for key in self.priorities.items()
                            if key[1] == max_severity][0][0]
        return usn_severity.capitalize()

    def create_usn_definition(self, usn_object, usn_number, id_base, test_refs, cve_dir):
        urls, cves_info = self.format_cves_info(usn_object['cves'], cve_dir)
        cve_references = self.create_cves_references(cves_info)
        bug_references = self.create_bug_references(urls)

        for cve in cves_info:
            if cve['Priority'] not in self.priorities:
                sys.stderr.write('\rERROR: {} in USN {} has a priority of {}, please assign a valid priority to the CVE. Defaulting to medium.\n'.format(cve['Candidate'],
                        usn_number, cve['Priority']))
                # Throw an error if the CVE's priority is not valid but assign
                # the CVE a priority of medium so it has a valid priority in
                # the oval file output
                cve['Priority'] = 'medium'

        usn_severity = self.get_usn_severity([self.priorities[cve['Priority']]
                                                        for cve in cves_info])

        product_description = get_subproject_description(self.release_codename)

        mapping = {
            'id': id_base,
            'usn_id': usn_object['id'],
            'ns': self.ns,
            'title': "{} -- {}".format(usn_object['id'], usn_object['title']),
            'plataform': "{}".format(self.release_name),
            'usn_url': self.usn_base_url.format(usn_object['id']),
            'description': escape(' '.join(usn_object['description'].strip().split('\n'))),
            'cves_references': cve_references,
            'bug_references': bug_references,
            'severity': usn_severity,
            'usn_timestamp': datetime.fromtimestamp(usn_object['timestamp'], tz=timezone.utc).strftime('%Y-%m-%d'),
            'criteria': '',
        }

        # convert number versions of binary pkgs into test criteria
        criteria = []
        for test_ref in test_refs:
            criteria.append('&lt;criterion test_ref="{0}:tst:{1}" comment="{2}" /&gt;'.format(self.ns, test_ref['testref_id'], product_description))
        mapping['criteria'] = '\n              '.join(criteria)

        definition = \
        """
        &lt;definition id="{ns}:def:{id}" version="1" class="patch"&gt;
           &lt;metadata&gt;
              &lt;title&gt;{title}&lt;/title&gt;
              &lt;affected family="unix"&gt;
                 &lt;platform&gt;{plataform}&lt;/platform&gt;
              &lt;/affected&gt;
              &lt;reference source="USN" ref_url="{usn_url}" ref_id="USN-{usn_id}"/&gt;
              {cves_references}
              &lt;description&gt;{description}&lt;/description&gt;
              &lt;advisory from="security@ubuntu.com"&gt;
                 &lt;severity&gt;{severity}&lt;/severity&gt;
                 &lt;issued date="{usn_timestamp}"/&gt;
                 {bug_references}
              &lt;/advisory&gt;
           &lt;/metadata&gt;
           &lt;criteria operator="OR"&gt;
              {criteria}
           &lt;/criteria&gt;
        &lt;/definition&gt;""".format(**mapping)

        return definition

    def create_usn_test(self, id_base, product):
        mapping = {
            'id': id_base,
            'ns': self.ns,
            'product': product,
        }

        if self.oval_format == 'dpkg':
            test = \
        """
        &lt;linux:dpkginfo_test id="{ns}:tst:{id}" version="1" check_existence="at_least_one_exists" check="at least one" comment="{product}"&gt;
          &lt;linux:object object_ref="{ns}:obj:{id}"/&gt;
          &lt;linux:state state_ref="{ns}:ste:{id}"/&gt;
        &lt;/linux:dpkginfo_test&gt;""".format(**mapping)
        else:
            test = \
        """
        &lt;ind:textfilecontent54_test id="{ns}:tst:{id}" version="1" check_existence="at_least_one_exists" check="at least one" comment="{product}"&gt;
          &lt;ind:object object_ref="{ns}:obj:{id}"/&gt;
          &lt;ind:state state_ref="{ns}:ste:{id}"/&gt;
        &lt;/ind:textfilecontent54_test&gt;""".format(**mapping)

        return test

    def create_usn_object(self, id_base, product):
        mapping = {
            'id': id_base,
            'ns': self.ns,
            'product': product,
        }

        if self.oval_format == 'dpkg':
            _object = \
        """
        &lt;linux:dpkginfo_object id="{ns}:obj:{id}" version="1" comment="{product}"&gt;
          &lt;linux:name var_ref="{ns}:var:{id}" var_check="at least one" /&gt;
        &lt;/linux:dpkginfo_object&gt;""".format(**mapping)
        else:
            mapping['path'] = "."
            mapping['filename'] = "manifest"

            _object = \
        """
        &lt;ind:textfilecontent54_object id="{ns}:obj:{id}" version="1" comment="{product}"&gt;
          &lt;ind:path&gt;{path}&lt;/ind:path&gt;
          &lt;ind:filename&gt;manifest&lt;/ind:filename&gt;
          &lt;ind:pattern operation="pattern match" datatype="string" var_ref="{ns}:var:{id}" var_check="at least one" /&gt;
          &lt;ind:instance operation="greater than or equal" datatype="int"&gt;1&lt;/ind:instance&gt;
        &lt;/ind:textfilecontent54_object&gt;""".format(**mapping)

        return _object

    def create_usn_state(self, binary_version, id_base, product):
        mapping = {
            'id': id_base,
            'ns': self.ns,
            'product': product,
        }

        if self.oval_format == 'dpkg':
            if binary_version.find(':') != -1:
                mapping['bversion'] = binary_version
            else:
                mapping['bversion'] = "0:" + binary_version

            state = \
        """
        &lt;linux:dpkginfo_state id="{ns}:ste:{id}" version="1" comment="{product}"&gt;
          &lt;linux:evr datatype="evr_string" operation="less than"&gt;{bversion}&lt;/linux:evr&gt;
        &lt;/linux:dpkginfo_state&gt;""".format(**mapping)

        else:
            mapping['bversion'] = binary_version

            state = \
        """
        &lt;ind:textfilecontent54_state id="{ns}:ste:{id}" version="1" comment="{product}"&gt;
          &lt;ind:subexpression datatype="evr_string" operation="less than"&gt;{bversion}&lt;/ind:subexpression&gt;
        &lt;/ind:textfilecontent54_state&gt;""".format(**mapping)

        return state

    def create_usn_variable(self, id_base, binaries_list, product):
        values = ""
        if self.oval_format == 'dpkg':
            for binary in binaries_list:
                values += \
            """&lt;value&gt;{}&lt;/value&gt;
            """.format(binary)
        else:
            for binary in binaries_list:
                values += \
            """&lt;value&gt;^{}\s+(.*)&lt;/value&gt;
            """.format(binary)

        mapping = {
            'id': id_base,
            'ns': self.ns,
            'values': values.strip(),
            'product': product,
        }

        constant_variable = \
        """
        &lt;constant_variable id="{ns}:var:{id}" version="1" datatype="string" comment="{product}"&gt;
            {values}
        &lt;/constant_variable&gt;""".format(**mapping)

        return constant_variable

    def get_cve_info_from_file(self, cve, cve_dir):
        cve_active_file_path = os.path.join(cve_dir, 'active', cve)
        cve_retired_file_path = os.path.join(cve_dir, 'retired', cve)

        if os.path.exists(cve_active_file_path):
            cve_file_path = cve_active_file_path
        elif os.path.exists(cve_retired_file_path):
            cve_file_path = cve_retired_file_path
        else:
            return None

        cve_object = load_cve(cve_file_path)
        if not cve_object:
            return None

        public_date = cve_object['PublicDate']
        priority = cve_object['Priority']

        # TODO: deal with multiple CVSS?
        cvss = None
        baseScore = None
        baseSeverity = None
        if cve_object['CVSS']:
            cvss = cve_object['CVSS'][0]['vector']
            baseScore = cve_object['CVSS'][0]['baseScore']
            baseSeverity = cve_object['CVSS'][0]['baseSeverity'].title()

        cve_info = {
                'Candidate': cve,
                'PublicDate': public_date,
                'Priority': priority,
                'CVSS': cvss,
                'CVSS_SEVERITY_LEVEL': baseSeverity,
                'CVSS_SCORE': baseScore,
                'CVE_URL': self.cve_base_url.format(cve),
                'MITRE_URL': self.mitre_base_url.format(cve)
                }

        return cve_info

    # FIXME: BUG: USN adds lp urls to 'cves': field, we need to filter it
    # and handle it separated till USN data be fixed
    def filter_cves(self, cves):
        _cves = cves[:]
        urls = []
        for cve in cves:
            # Takes urls from the list
            is_url = re.match('(www|http:|https:)+[^\s]+[\w]', cve)

            if is_url:
                urls.append(cve)
                _cves.remove(cve)

        return (urls, _cves)

    def format_cves_info(self, cves, cve_dir):
        urls, cves = self.filter_cves(cves)
        cves_info = []
        for cve in cves:
            res = self.get_cve_info_from_file(cve, cve_dir)
            if res:
                cves_info.append(res)

        return urls, cves_info

    def get_version_from_binaries(self, usn_allbinaries):
        version_map = collections.defaultdict(list)
        for k, v in usn_allbinaries.items():
            version_map[v['version']].append(k)

        return version_map

    def get_testref(self, version, pkgs, testref_id):
        new_id = '{0}0'.format(testref_id)
        return {'version': version, 'pkgs': pkgs, 'testref_id':new_id}

    def get_all_binaries_object(self, usn_object):
        usn_release = usn_object['releases'][self.release_codename]

        if 'allbinaries' in usn_release.keys():
            usn_allbinaries = usn_release['allbinaries']
        else:
            usn_allbinaries = usn_release['binaries']

        return usn_allbinaries

    def generate_usn_oval(self, usn_object, usn_number, cve_dir):
        if self.release_codename not in usn_object['releases'].keys():
            return

        usn_release = usn_object['releases'][self.release_codename]
        id_base = int(re.sub('[^0-9]','', usn_number)) * 1000000


        if 'allbinaries' in usn_release.keys():
            usn_allbinaries = usn_release['allbinaries']
        else:
            usn_allbinaries = usn_release['binaries']

        binary_versions = self.get_version_from_binaries(usn_allbinaries)

        product_description = get_subproject_description(self.release_codename)

        # group binaries with same version (most likely from same source)
        # and create a test_ref for the group to be used when creating
        # the oval def, test, state and var.
        test_refs = []
        for key in list(binary_versions):
            test_ref = self.get_testref(key, binary_versions[key], id_base + len(test_refs))
            if test_ref:
               test_refs.append(test_ref)

        # Create the oval objects
        # Only need one definition, but if multiple versions of binary pkgs,
        # then may need several test, object, state and var

        usn_def = self.create_usn_definition(usn_object, usn_number, id_base, test_refs, cve_dir)
        self.oval_structure['definition'].write(usn_def)

        for test_ref in test_refs:
            usn_test = self.create_usn_test(test_ref['testref_id'], product_description)
            usn_obj = self.create_usn_object(test_ref['testref_id'], product_description)
            usn_state = self.create_usn_state(test_ref['version'], test_ref['testref_id'], product_description)
            usn_variable = self.create_usn_variable(test_ref['testref_id'], test_ref['pkgs'], product_description)

            self.oval_structure['test'].write(usn_test)
            self.oval_structure['object'].write(usn_obj)
            self.oval_structure['state'].write(usn_state)
            self.oval_structure['variable'].write(usn_variable)

    def write_oval_elements(self):
        """ write OVAL elements to .xml file w. OVAL header and footer """
        for key in self.oval_structure:
            self.oval_structure[key].close()
            self.oval_structure[key] = open(self.oval_structure[key].name, 'rt')

        tmp = os.path.join(self.tmpdir, self.output_filepath)
        with open(tmp, 'wt') as f:
            # add header
            oval_timestamp = datetime.now(tz=timezone.utc).strftime(
                '%Y-%m-%dT%H:%M:%S')
            copyright_year = datetime.now(tz=timezone.utc).year
            header = \
"""&lt;oval_definitions
    xmlns="http://oval.mitre.org/XMLSchema/oval-definitions-5"
    xmlns:ind="http://oval.mitre.org/XMLSchema/oval-definitions-5#independent"
    xmlns:oval="http://oval.mitre.org/XMLSchema/oval-common-5"
    xmlns:unix="http://oval.mitre.org/XMLSchema/oval-definitions-5#unix"
    xmlns:linux="http://oval.mitre.org/XMLSchema/oval-definitions-5#linux"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://oval.mitre.org/XMLSchema/oval-common-5 oval-common-schema.xsd   http://oval.mitre.org/XMLSchema/oval-definitions-5 oval-definitions-schema.xsd   http://oval.mitre.org/XMLSchema/oval-definitions-5#independent independent-definitions-schema.xsd   http://oval.mitre.org/XMLSchema/oval-definitions-5#unix unix-definitions-schema.xsd   http://oval.mitre.org/XMLSchema/oval-definitions-5#macos linux-definitions-schema.xsd"&gt;

    &lt;generator&gt;
        &lt;oval:product_name&gt;Canonical USN OVAL Generator&lt;/oval:product_name&gt;
        &lt;oval:product_version&gt;{0}&lt;/oval:product_version&gt;
        &lt;oval:schema_version&gt;{1}&lt;/oval:schema_version&gt;
        &lt;oval:timestamp&gt;{2}&lt;/oval:timestamp&gt;
        &lt;terms_of_use&gt;Copyright (C) {3} Canonical LTD. All rights reserved. This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, version 3 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License version 3 for more details. You should have received a copy of the GNU General Public License version 3 along with this program.  If not, see http://www.gnu.org/licenses/.&lt;/terms_of_use&gt;
    &lt;/generator&gt;\n""".format(self.generator_version, self.oval_schema_version, oval_timestamp, copyright_year)

            f.write(header)
            # add queued file content
            for element in self.supported_oval_elements:
                if element in self.oval_structure:
                    f.write("\n    &lt;{0}s&gt;\n".format(element))
                    f.write(self.oval_structure[element].read().rstrip())
                    f.write("\n    &lt;/{0}s&gt;".format(element))

            # add footer
            footer = "\n&lt;/oval_definitions&gt;"
            f.write(footer)

        # close and delete queue files
        for key in self.oval_structure:
            self.oval_structure[key].close()
            os.remove(self.oval_structure[key].name)

        # close self.output_filepath and move into place
        f.close()
        shutil.move(tmp, os.path.join(self.output_dir, self.output_filepath))

        # remove tmp dir if empty
        if not os.listdir(self.tmpdir):
            os.rmdir(self.tmpdir)
</pre></body></html>