|  | # -*- coding: utf-8 -*-
"""Utilities for generating, parsing and checking XML/XSD files on top of the lxml.etree module."""
import base64
import contextlib
import logging
import re
import zipfile
from io import BytesIO
import requests
from lxml import etree
from odoo.exceptions import UserError
from odoo.tools.misc import file_open
__all__ = [
    "cleanup_xml_node",
    "load_xsd_files_from_url",
    "validate_xml_from_attachment",
]
_logger = logging.getLogger(__name__)
def remove_control_characters(byte_node):
    """
    The characters to be escaped are the control characters #x0 to #x1F and #x7F (most of which cannot appear in XML)
    [...] XML processors must accept any character in the range specified for Char:
    `Char	   :: =   	#x9 | #xA | #xD | [#x20-#xD7FF] | [#xE000-#xFFFD] | [#x10000-#x10FFFF]`
    source:https://www.w3.org/TR/xml/
    """
    return re.sub(
        '[^'
        '\u0009'
        '\u000A'
        '\u000D'
        '\u0020-\uD7FF'
        '\uE000-\uFFFD'
        '\U00010000-\U0010FFFF'
        ']'.encode(),
        b'',
        byte_node,
    )
class odoo_resolver(etree.Resolver):
    """Odoo specific file resolver that can be added to the XML Parser.
    It will search filenames in the ir.attachments
    """
    def __init__(self, env, prefix):
        super().__init__()
        self.env = env
        self.prefix = prefix
    def resolve(self, url, id, context):
        """Search url in ``ir.attachment`` and return the resolved content."""
        attachment_name = f'{self.prefix}.{url}' if self.prefix else url
        attachment = self.env['ir.attachment'].search([('name', '=', attachment_name)])
        if attachment:
            return self.resolve_string(attachment.raw, context)
def _validate_xml(env, url, path, xmls):
    # Get the XSD data
    xsd_attachment = env['ir.attachment']
    if path:
        with file_open(path, filter_ext=('.xsd',)) as file:
            content = file.read()
        attachment_vals = {
            'name': path.split('/')[-1],
            'datas': base64.b64encode(content.encode()),
        }
        xsd_attachment = env['ir.attachment'].create(attachment_vals)
    elif url:
        xsd_attachment = load_xsd_files_from_url(env, url)
    # Validate the XML against the XSD
    if not isinstance(xmls, list):
        xmls = [xmls]
    for xml in xmls:
        validate_xml_from_attachment(env, xml, xsd_attachment.name)
    xsd_attachment.unlink()
def _check_with_xsd(tree_or_str, stream, env=None, prefix=None):
    """Check an XML against an XSD schema.
    This will raise a UserError if the XML file is not valid according to the
    XSD file.
    :param str | etree._Element tree_or_str: representation of the tree to be checked
    :param io.IOBase | str stream: the byte stream used to build the XSD schema.
        If env is given, it can also be the name of an attachment in the filestore
    :param odoo.api.Environment env: If it is given, it enables resolving the
        imports of the schema in the filestore with ir.attachments.
    :param str prefix: if given, provides a prefix to try when
        resolving the imports of the schema. e.g. prefix='l10n_cl_edi' will
        enable 'SiiTypes_v10.xsd' to be resolved to 'l10n_cl_edi.SiiTypes_v10.xsd'.
    """
    if not isinstance(tree_or_str, etree._Element):
        tree_or_str = etree.fromstring(tree_or_str)
    parser = etree.XMLParser()
    if env:
        parser.resolvers.add(odoo_resolver(env, prefix))
        if isinstance(stream, str) and stream.endswith('.xsd'):
            attachment = env['ir.attachment'].search([('name', '=', stream)])
            if not attachment:
                raise FileNotFoundError()
            stream = BytesIO(attachment.raw)
    xsd_schema = etree.XMLSchema(etree.parse(stream, parser=parser))
    try:
        xsd_schema.assertValid(tree_or_str)
    except etree.DocumentInvalid as xml_errors:
        raise UserError('\n'.join(str(e) for e in xml_errors.error_log))
def create_xml_node_chain(first_parent_node, nodes_list, last_node_value=None):
    """Generate a hierarchical chain of nodes.
    Each new node being the child of the previous one based on the tags contained
    in `nodes_list`, under the given node `first_parent_node`.
    :param etree._Element first_parent_node: parent of the created tree/chain
    :param Iterable[str] nodes_list: tag names to be created
    :param str last_node_value: if specified, set the last node's text to this value
    :returns: the list of created nodes
    :rtype: list[etree._Element]
    """
    res = []
    current_node = first_parent_node
    for tag in nodes_list:
        current_node = etree.SubElement(current_node, tag)
        res.append(current_node)
    if last_node_value is not None:
        current_node.text = last_node_value
    return res
def create_xml_node(parent_node, node_name, node_value=None):
    """Create a new node.
    :param etree._Element parent_node: parent of the created node
    :param str node_name: name of the created node
    :param str node_value: value of the created node (optional)
    :rtype: etree._Element
    """
    return create_xml_node_chain(parent_node, [node_name], node_value)[0]
def cleanup_xml_node(xml_node_or_string, remove_blank_text=True, remove_blank_nodes=True, indent_level=0, indent_space="  "):
    """Clean up the sub-tree of the provided XML node.
    If the provided XML node is of type:
    - etree._Element, it is modified in-place.
    - string/bytes, it is first parsed into an etree._Element
    :param xml_node_or_string (etree._Element, str): XML node (or its string/bytes representation)
    :param remove_blank_text (bool): if True, removes whitespace-only text from nodes
    :param remove_blank_nodes (bool): if True, removes leaf nodes with no text (iterative, depth-first, done after remove_blank_text)
    :param indent_level (int): depth or level of node within root tree (use -1 to leave indentation as-is)
    :param indent_space (str): string to use for indentation (use '' to remove all indentation)
    :returns (etree._Element): clean node, same instance that was received (if applicable)
    """
    xml_node = xml_node_or_string
    # Convert str/bytes to etree._Element
    if isinstance(xml_node, str):
        xml_node = xml_node.encode()  # misnomer: fromstring actually reads bytes
    if isinstance(xml_node, bytes):
        parser = etree.XMLParser(recover=True, resolve_entities=False)
        xml_node = etree.fromstring(remove_control_characters(xml_node), parser=parser)
    # Process leaf nodes iteratively
    # Depth-first, so any inner node may become a leaf too (if children are removed)
    def leaf_iter(parent_node, node, level):
        for child_node in node:
            leaf_iter(node, child_node, level if level < 0 else level + 1)
        # Indentation
        if level >= 0:
            indent = '\n' + indent_space * level
            if not node.tail or not node.tail.strip():
                node.tail = '\n' if parent_node is None else indent
            if len(node) > 0:
                if not node.text or not node.text.strip():
                    # First child's indentation is parent's text
                    node.text = indent + indent_space
                last_child = node[-1]
                if last_child.tail == indent + indent_space:
                    # Last child's tail is parent's closing tag indentation
                    last_child.tail = indent
        # Removal condition: node is leaf (not root nor inner node)
        if parent_node is not None and len(node) == 0:
            if remove_blank_text and node.text is not None and not node.text.strip():
                # node.text is None iff node.tag is self-closing (text='' creates closing tag)
                node.text = ''
            if remove_blank_nodes and not (node.text or ''):
                parent_node.remove(node)
    leaf_iter(None, xml_node, indent_level)
    return xml_node
def load_xsd_files_from_url(env, url, file_name=None, force_reload=False,
                            request_max_timeout=10, xsd_name_prefix='', xsd_names_filter=None, modify_xsd_content=None):
    """Load XSD file or ZIP archive. Save XSD files as ir.attachment.
    An XSD attachment is saved as {xsd_name_prefix}.{filename} where the filename is either the filename obtained
    from the URL or from the ZIP archive, or the `file_name` param if it is specified and a single XSD is being downloaded.
    A typical prefix is the calling module's name.
    For ZIP archives, XSD files inside it will be saved as attachments, depending on the provided list of XSD names.
    ZIP archive themselves are not saved.
    The XSD files content can be modified by providing the `modify_xsd_content` function as argument.
    Typically, this is used when XSD files depend on each other (with the schemaLocation attribute),
    but it can be used for any purpose.
    :param odoo.api.Environment env: environment of calling module
    :param str url: URL of XSD file/ZIP archive
    :param str file_name: used as attachment name if the URL leads to a single XSD, otherwise ignored
    :param bool force_reload: Deprecated.
    :param int request_max_timeout: maximum time (in seconds) before the request times out
    :param str xsd_name_prefix: if provided, will be added as a prefix to every XSD file name
    :param list | str xsd_names_filter: if provided, will only save the XSD files with these names
    :param func modify_xsd_content: function that takes the xsd content as argument and returns a modified version of it
    :rtype: odoo.api.ir.attachment | bool
    :return: every XSD attachment created/fetched or False if an error occurred (see warning logs)
    """
    try:
        _logger.info("Fetching file/archive from given URL: %s", url)
        response = requests.get(url, timeout=request_max_timeout)
        response.raise_for_status()
    except requests.exceptions.HTTPError as error:
        _logger.warning('HTTP error: %s with the given URL: %s', error, url)
        return False
    except requests.exceptions.ConnectionError as error:
        _logger.warning('Connection error: %s with the given URL: %s', error, url)
        return False
    except requests.exceptions.Timeout as error:
        _logger.warning('Request timeout: %s with the given URL: %s', error, url)
        return False
    content = response.content
    if not content:
        _logger.warning("The HTTP response from %s is empty (no content)", url)
        return False
    archive = None
    with contextlib.suppress(zipfile.BadZipFile):
        archive = zipfile.ZipFile(BytesIO(content))
    if archive is None:
        if modify_xsd_content:
            content = modify_xsd_content(content)
        if not file_name:
            file_name = f"{url.split('/')[-1]}"
            _logger.info("XSD name not provided, defaulting to %s", file_name)
        prefixed_xsd_name = f"{xsd_name_prefix}.{file_name}" if xsd_name_prefix else file_name
        fetched_attachment = env['ir.attachment'].search([('name', '=', prefixed_xsd_name)], limit=1)
        if fetched_attachment:
            _logger.info("Updating the content of ir.attachment with name: %s", prefixed_xsd_name)
            fetched_attachment.raw = content
            return fetched_attachment
        else:
            _logger.info("Saving XSD file as ir.attachment, with name: %s", prefixed_xsd_name)
            return env['ir.attachment'].create({
                'name': prefixed_xsd_name,
                'raw': content,
                'public': True,
            })
    saved_attachments = env['ir.attachment']
    for file_path in archive.namelist():
        if not file_path.endswith('.xsd'):
            continue
        file_name = file_path.rsplit('/', 1)[-1]
        if xsd_names_filter and file_name not in xsd_names_filter:
            _logger.info("Skipping file with name %s in ZIP archive", file_name)
            continue
        try:
            content = archive.read(file_path)
        except KeyError:
            _logger.warning("Failed to retrieve XSD file with name %s from ZIP archive", file_name)
            continue
        if modify_xsd_content:
            content = modify_xsd_content(content)
        prefixed_xsd_name = f"{xsd_name_prefix}.{file_name}" if xsd_name_prefix else file_name
        fetched_attachment = env['ir.attachment'].search([('name', '=', prefixed_xsd_name)], limit=1)
        if fetched_attachment:
            _logger.info("Updating the content of ir.attachment with name: %s", prefixed_xsd_name)
            fetched_attachment.raw = content
            saved_attachments |= fetched_attachment
        else:
            _logger.info("Saving XSD file as ir.attachment, with name: %s", prefixed_xsd_name)
            saved_attachments |= env['ir.attachment'].create({
                'name': prefixed_xsd_name,
                'raw': content,
                'public': True,
            })
    return saved_attachments
def validate_xml_from_attachment(env, xml_content, xsd_name, reload_files_function=None, prefix=None):
    """Try and validate the XML content with an XSD attachment.
    If the XSD attachment cannot be found in database, skip validation without raising.
    :param odoo.api.Environment env: environment of calling module
    :param xml_content: the XML content to validate
    :param xsd_name: the XSD file name in database
    :param reload_files_function: Deprecated.
    :return: the result of the function :func:`odoo.tools.xml_utils._check_with_xsd`
    """
    prefixed_xsd_name = f"{prefix}.{xsd_name}" if prefix else xsd_name
    try:
        _logger.info("Validating with XSD...")
        _check_with_xsd(xml_content, prefixed_xsd_name, env, prefix)
        _logger.info("XSD validation successful!")
    except FileNotFoundError:
        _logger.info("XSD file not found, skipping validation")
    except etree.XMLSchemaParseError as e:
        _logger.error("XSD file not valid: ")
        for arg in e.args:
            _logger.error(arg)
def find_xml_value(xpath, xml_element, namespaces=None):
    element = xml_element.xpath(xpath, namespaces=namespaces)
    return element[0].text if element else None
 |