diff --git a/facho/cli.py b/facho/cli.py index b30b1a9..69e1ced 100644 --- a/facho/cli.py +++ b/facho/cli.py @@ -281,7 +281,31 @@ def generate_invoice(private_key, passphrase, scriptname, generate=False, ssl=Tr else: with open(output, 'w') as f: f.write(xmlstring) - + + +@click.command() +@click.option('--private-key', type=click.Path(exists=True)) +@click.option('--passphrase') +@click.option('--ssl/--no-ssl', default=False) +@click.option('--use-cache-policy/--no-use-cache-policy', default=False) +@click.argument('xmlfile', type=click.Path(exists=True), required=True) +def sign_verify_xml(private_key, passphrase, xmlfile, ssl=True, use_cache_policy=False, output=None): + if not ssl: + disable_ssl() + + from facho.fe import fe + if use_cache_policy: + warnings.warn("xades using cache policy") + + print("THIS ONLY WORKS FOR DOCUMENTS GENERATE WITH FACHO") + signer = fe.DianXMLExtensionSignerVerifier(private_key, passphrase=passphrase, mockpolicy=use_cache_policy) + document = open(xmlfile, 'r').read().encode('utf-8') + + if signer.verify_string(document): + print("+OK") + else: + print("-INVALID") + @click.group() def main(): pass @@ -296,3 +320,4 @@ main.add_command(soap_get_numbering_range) main.add_command(generate_invoice) main.add_command(validate_invoice) main.add_command(sign_xml) +main.add_command(sign_verify_xml) diff --git a/facho/fe/fe.py b/facho/fe/fe.py index 7de3ed2..645c3e0 100644 --- a/facho/fe/fe.py +++ b/facho/fe/fe.py @@ -18,7 +18,12 @@ SCHEME_AGENCY_ATTRS = { 'schemeAgencyID': '195' } - + +# RESOLUCION 0001: pagina 516 +POLICY_ID = 'https://facturaelectronica.dian.gov.co/politicadefirma/v2/politicadefirmav2.pdf' +POLICY_NAME = u'Política de firma para facturas electrónicas de la República de Colombia.' + + NAMESPACES = { 'fe': 'http://www.dian.gov.co/contratos/facturaelectronica/v1', 'cac': 'urn:oasis:names:specification:ubl:schema:xsd:CommonAggregateComponents-2', @@ -36,6 +41,22 @@ NAMESPACES = { 'sig': 'http://www.w3.org/2000/09/xmldsig#', } +from contextlib import contextmanager +@contextmanager +def mock_xades_policy(): + from mock import patch + import os.path + with patch('xades.policy.urllib.urlopen') as mock: + class UrllibPolicyMock: + def read(self): + cur_dir = os.path.dirname(os.path.abspath(__file__)) + data_dir = os.path.join(cur_dir, 'data', 'dian') + policy_file = os.path.join(data_dir, 'politicadefirmav2.pdf') + with open(policy_file, 'rb') as f: + return f.read() + + mock.return_value = UrllibPolicyMock() + yield class FeXML(FachoXML): @@ -177,10 +198,6 @@ class DianXMLExtensionSoftwareSecurityCode(FachoXMLExtension): class DianXMLExtensionSigner(FachoXMLExtension): - # RESOLUCION 0001: pagina 516 - POLICY_ID = 'https://facturaelectronica.dian.gov.co/politicadefirma/v2/politicadefirmav2.pdf' - POLICY_NAME = u'Política de firma para facturas electrónicas de la República de Colombia.' - def __init__(self, pkcs12_path, passphrase=None, mockpolicy=False): self._pkcs12_path = pkcs12_path @@ -243,26 +260,15 @@ class DianXMLExtensionSigner(FachoXMLExtension): xades.template.add_claimed_role(props, "supplier") policy = xades.policy.GenericPolicyId( - self.POLICY_ID, - self.POLICY_NAME, + POLICY_ID, + POLICY_NAME, xmlsig.constants.TransformSha256) ctx = xades.XAdESContext(policy) ctx.load_pkcs12(OpenSSL.crypto.load_pkcs12(open(self._pkcs12_path, 'rb').read(), self._passphrase)) if self._mockpolicy: - from mock import patch - import os.path - with patch('xades.policy.urllib.urlopen') as mock: - class UrllibPolicyMock: - def read(self): - cur_dir = os.path.dirname(os.path.abspath(__file__)) - data_dir = os.path.join(cur_dir, 'data', 'dian') - policy_file = os.path.join(data_dir, 'politicadefirmav2.pdf') - with open(policy_file, 'rb') as f: - return f.read() - - mock.return_value = UrllibPolicyMock() + with mock_xades_policy(): ctx.sign(signature) ctx.verify(signature) else: @@ -358,3 +364,38 @@ class DianZIP: def __exit__(self, type, value, traceback): return self.zipfile.close() + + +class DianXMLExtensionSignerVerifier: + + def __init__(self, pkcs12_path, passphrase=None, mockpolicy=False): + self._pkcs12_path = pkcs12_path + self._passphrase = None + self._mockpolicy = mockpolicy + if passphrase: + self._passphrase = passphrase.encode('utf-8') + + def verify_string(self, document): + xml = LXMLBuilder.from_string(document) + fachoxml = FachoXML(xml,nsmap=NAMESPACES) + + signature = fachoxml.builder.xpath(fachoxml.root, '//ds:Signature') + assert signature is not None + + signature.getparent().remove(signature) + fachoxml.root.append(signature) + + ctx = xades.XAdESContext() + ctx.load_pkcs12(OpenSSL.crypto.load_pkcs12(open(self._pkcs12_path, 'rb').read(), + self._passphrase)) + + try: + if self._mockpolicy: + with mock_xades_policy(): + ctx.verify(signature) + else: + ctx.verify(signature) + return True + except: + return False +