facho: clip.py se adiciona nuevo comando sign-verify-xml
FossilOrigin-Name: 66279b3511c7baabe491c86ecfce3b7680df8a8f049219baa4dcbdef42349f24
This commit is contained in:
		
							
								
								
									
										27
									
								
								facho/cli.py
									
									
									
									
									
								
							
							
						
						
									
										27
									
								
								facho/cli.py
									
									
									
									
									
								
							| @@ -281,7 +281,31 @@ def generate_invoice(private_key, passphrase, scriptname, generate=False, ssl=Tr | |||||||
|         else: |         else: | ||||||
|             with open(output, 'w') as f: |             with open(output, 'w') as f: | ||||||
|                 f.write(xmlstring) |                 f.write(xmlstring) | ||||||
|      |  | ||||||
|  |  | ||||||
|  | @click.command() | ||||||
|  | @click.option('--private-key', type=click.Path(exists=True)) | ||||||
|  | @click.option('--passphrase') | ||||||
|  | @click.option('--ssl/--no-ssl', default=False) | ||||||
|  | @click.option('--use-cache-policy/--no-use-cache-policy', default=False) | ||||||
|  | @click.argument('xmlfile', type=click.Path(exists=True), required=True) | ||||||
|  | def sign_verify_xml(private_key, passphrase, xmlfile, ssl=True, use_cache_policy=False, output=None): | ||||||
|  |     if not ssl: | ||||||
|  |         disable_ssl() | ||||||
|  |          | ||||||
|  |     from facho.fe import fe | ||||||
|  |     if use_cache_policy: | ||||||
|  |         warnings.warn("xades using cache policy") | ||||||
|  |  | ||||||
|  |     print("THIS ONLY WORKS FOR DOCUMENTS GENERATE WITH FACHO") | ||||||
|  |     signer = fe.DianXMLExtensionSignerVerifier(private_key, passphrase=passphrase, mockpolicy=use_cache_policy) | ||||||
|  |     document = open(xmlfile, 'r').read().encode('utf-8') | ||||||
|  |  | ||||||
|  |     if signer.verify_string(document): | ||||||
|  |         print("+OK") | ||||||
|  |     else: | ||||||
|  |         print("-INVALID") | ||||||
|  |  | ||||||
| @click.group() | @click.group() | ||||||
| def main(): | def main(): | ||||||
|     pass |     pass | ||||||
| @@ -296,3 +320,4 @@ main.add_command(soap_get_numbering_range) | |||||||
| main.add_command(generate_invoice) | main.add_command(generate_invoice) | ||||||
| main.add_command(validate_invoice) | main.add_command(validate_invoice) | ||||||
| main.add_command(sign_xml) | main.add_command(sign_xml) | ||||||
|  | main.add_command(sign_verify_xml) | ||||||
|   | |||||||
| @@ -18,7 +18,12 @@ SCHEME_AGENCY_ATTRS = { | |||||||
|     'schemeAgencyID': '195' |     'schemeAgencyID': '195' | ||||||
| } | } | ||||||
|  |  | ||||||
|                        |          | ||||||
|  | # RESOLUCION 0001: pagina 516 | ||||||
|  | POLICY_ID = 'https://facturaelectronica.dian.gov.co/politicadefirma/v2/politicadefirmav2.pdf' | ||||||
|  | POLICY_NAME = u'Política de firma para facturas electrónicas de la República de Colombia.' | ||||||
|  |  | ||||||
|  |  | ||||||
| NAMESPACES = { | NAMESPACES = { | ||||||
|     'fe': 'http://www.dian.gov.co/contratos/facturaelectronica/v1', |     'fe': 'http://www.dian.gov.co/contratos/facturaelectronica/v1', | ||||||
|     'cac': 'urn:oasis:names:specification:ubl:schema:xsd:CommonAggregateComponents-2', |     'cac': 'urn:oasis:names:specification:ubl:schema:xsd:CommonAggregateComponents-2', | ||||||
| @@ -36,6 +41,22 @@ NAMESPACES = { | |||||||
|     'sig': 'http://www.w3.org/2000/09/xmldsig#', |     'sig': 'http://www.w3.org/2000/09/xmldsig#', | ||||||
| } | } | ||||||
|  |  | ||||||
|  | from contextlib import contextmanager | ||||||
|  | @contextmanager | ||||||
|  | def mock_xades_policy(): | ||||||
|  |     from mock import patch | ||||||
|  |     import os.path | ||||||
|  |     with patch('xades.policy.urllib.urlopen') as mock: | ||||||
|  |         class UrllibPolicyMock: | ||||||
|  |             def read(self): | ||||||
|  |                 cur_dir = os.path.dirname(os.path.abspath(__file__)) | ||||||
|  |                 data_dir = os.path.join(cur_dir, 'data', 'dian') | ||||||
|  |                 policy_file = os.path.join(data_dir, 'politicadefirmav2.pdf') | ||||||
|  |                 with open(policy_file, 'rb') as f: | ||||||
|  |                     return f.read() | ||||||
|  |  | ||||||
|  |         mock.return_value = UrllibPolicyMock() | ||||||
|  |         yield | ||||||
|  |  | ||||||
| class FeXML(FachoXML): | class FeXML(FachoXML): | ||||||
|  |  | ||||||
| @@ -177,10 +198,6 @@ class DianXMLExtensionSoftwareSecurityCode(FachoXMLExtension): | |||||||
|  |  | ||||||
|      |      | ||||||
| class DianXMLExtensionSigner(FachoXMLExtension): | class DianXMLExtensionSigner(FachoXMLExtension): | ||||||
|     # RESOLUCION 0001: pagina 516 |  | ||||||
|     POLICY_ID = 'https://facturaelectronica.dian.gov.co/politicadefirma/v2/politicadefirmav2.pdf' |  | ||||||
|     POLICY_NAME = u'Política de firma para facturas electrónicas de la República de Colombia.' |  | ||||||
|      |  | ||||||
|      |      | ||||||
|     def __init__(self, pkcs12_path, passphrase=None, mockpolicy=False): |     def __init__(self, pkcs12_path, passphrase=None, mockpolicy=False): | ||||||
|         self._pkcs12_path = pkcs12_path |         self._pkcs12_path = pkcs12_path | ||||||
| @@ -243,26 +260,15 @@ class DianXMLExtensionSigner(FachoXMLExtension): | |||||||
|         xades.template.add_claimed_role(props, "supplier") |         xades.template.add_claimed_role(props, "supplier") | ||||||
|          |          | ||||||
|         policy = xades.policy.GenericPolicyId( |         policy = xades.policy.GenericPolicyId( | ||||||
|             self.POLICY_ID, |             POLICY_ID, | ||||||
|             self.POLICY_NAME, |             POLICY_NAME, | ||||||
|             xmlsig.constants.TransformSha256) |             xmlsig.constants.TransformSha256) | ||||||
|         ctx = xades.XAdESContext(policy) |         ctx = xades.XAdESContext(policy) | ||||||
|         ctx.load_pkcs12(OpenSSL.crypto.load_pkcs12(open(self._pkcs12_path, 'rb').read(), |         ctx.load_pkcs12(OpenSSL.crypto.load_pkcs12(open(self._pkcs12_path, 'rb').read(), | ||||||
|                                                    self._passphrase)) |                                                    self._passphrase)) | ||||||
|  |  | ||||||
|         if self._mockpolicy: |         if self._mockpolicy: | ||||||
|             from mock import patch |             with mock_xades_policy(): | ||||||
|             import os.path |  | ||||||
|             with patch('xades.policy.urllib.urlopen') as mock: |  | ||||||
|                 class UrllibPolicyMock: |  | ||||||
|                     def read(self): |  | ||||||
|                         cur_dir = os.path.dirname(os.path.abspath(__file__)) |  | ||||||
|                         data_dir = os.path.join(cur_dir, 'data', 'dian') |  | ||||||
|                         policy_file = os.path.join(data_dir, 'politicadefirmav2.pdf') |  | ||||||
|                         with open(policy_file, 'rb') as f: |  | ||||||
|                             return f.read() |  | ||||||
|  |  | ||||||
|                 mock.return_value = UrllibPolicyMock() |  | ||||||
|                 ctx.sign(signature) |                 ctx.sign(signature) | ||||||
|                 ctx.verify(signature) |                 ctx.verify(signature) | ||||||
|         else: |         else: | ||||||
| @@ -358,3 +364,38 @@ class DianZIP: | |||||||
|  |  | ||||||
|     def __exit__(self, type, value, traceback): |     def __exit__(self, type, value, traceback): | ||||||
|         return self.zipfile.close() |         return self.zipfile.close() | ||||||
|  |  | ||||||
|  |  | ||||||
|  | class DianXMLExtensionSignerVerifier: | ||||||
|  |      | ||||||
|  |     def __init__(self, pkcs12_path, passphrase=None, mockpolicy=False): | ||||||
|  |         self._pkcs12_path = pkcs12_path | ||||||
|  |         self._passphrase = None | ||||||
|  |         self._mockpolicy = mockpolicy | ||||||
|  |         if passphrase: | ||||||
|  |             self._passphrase = passphrase.encode('utf-8') | ||||||
|  |  | ||||||
|  |     def verify_string(self, document): | ||||||
|  |         xml = LXMLBuilder.from_string(document) | ||||||
|  |         fachoxml = FachoXML(xml,nsmap=NAMESPACES) | ||||||
|  |  | ||||||
|  |         signature = fachoxml.builder.xpath(fachoxml.root, '//ds:Signature') | ||||||
|  |         assert signature is not None | ||||||
|  |          | ||||||
|  |         signature.getparent().remove(signature) | ||||||
|  |         fachoxml.root.append(signature) | ||||||
|  |  | ||||||
|  |         ctx = xades.XAdESContext() | ||||||
|  |         ctx.load_pkcs12(OpenSSL.crypto.load_pkcs12(open(self._pkcs12_path, 'rb').read(), | ||||||
|  |                                                    self._passphrase)) | ||||||
|  |  | ||||||
|  |         try: | ||||||
|  |             if self._mockpolicy: | ||||||
|  |                 with mock_xades_policy(): | ||||||
|  |                     ctx.verify(signature) | ||||||
|  |             else: | ||||||
|  |                 ctx.verify(signature) | ||||||
|  |             return True | ||||||
|  |         except: | ||||||
|  |             return False | ||||||
|  |          | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user