oc-facho/facho/fe/fe.py
bit4bit aa3c83933d se le saca filo al machete
FossilOrigin-Name: 3c8cccda8e119b5d1d0edbce2ec326aa2f99053024efd9dd21c2092dacd08889
2022-05-11 01:17:48 +00:00

522 lines
20 KiB
Python

# This file is part of facho. The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
from ..facho import FachoXML, FachoXMLExtension, LXMLBuilder
import uuid
import xmlsig
import xades
from datetime import datetime
import OpenSSL
import zipfile
import warnings
import hashlib
from contextlib import contextmanager
from .data.dian import codelist
from . import form
from collections import defaultdict
from pathlib import Path
AMBIENTE_PRUEBAS = codelist.TipoAmbiente.by_name('Pruebas')['code']
AMBIENTE_PRODUCCION = codelist.TipoAmbiente.by_name('Producción')['code']
SCHEME_AGENCY_ATTRS = {
'schemeAgencyName': 'CO, DIAN (Dirección de Impuestos y Aduanas Nacionales)',
'schemeAgencyID': '195'
}
# RESOLUCION 0001: pagina 516
POLICY_ID = 'https://facturaelectronica.dian.gov.co/politicadefirma/v2/politicadefirmav2.pdf'
POLICY_NAME = u'Política de firma para facturas electrónicas de la República de Colombia.'
NAMESPACES = {
'atd': 'urn:oasis:names:specification:ubl:schema:xsd:AttachedDocument-2',
'nomina': 'dian:gov:co:facturaelectronica:NominaIndividual',
'fe': 'http://www.dian.gov.co/contratos/facturaelectronica/v1',
'xs': 'http://www.w3.org/2001/XMLSchema-instance',
'cac': 'urn:oasis:names:specification:ubl:schema:xsd:CommonAggregateComponents-2',
'cbc': 'urn:oasis:names:specification:ubl:schema:xsd:CommonBasicComponents-2',
'cdt': 'urn:DocumentInformation:names:specification:ubl:colombia:schema:xsd:DocumentInformationAggregateComponents-1',
'clm54217': 'urn:un:unece:uncefact:codelist:specification:54217:2001',
'clmIANAMIMEMediaType': 'urn:un:unece:uncefact:codelist:specification:IANAMIMEMediaType:2003',
'ext': 'urn:oasis:names:specification:ubl:schema:xsd:CommonExtensionComponents-2',
'qdt': 'urn:oasis:names:specification:ubl:schema:xsd:QualifiedDatatypes-2',
'sts': 'dian:gov:co:facturaelectronica:Structures-2-1',
'udt': 'urn:un:unece:uncefact:data:specification:UnqualifiedDataTypesSchemaModule:2',
'xsi': 'http://www.w3.org/2001/XMLSchema-instance',
'xades': 'http://uri.etsi.org/01903/v1.3.2#',
'xades141': 'http://uri.etsi.org/01903/v1.4.1#',
'ds': 'http://www.w3.org/2000/09/xmldsig#',
'sig': 'http://www.w3.org/2000/09/xmldsig#',
}
def fe_from_string(document: str) -> FachoXML:
return FeXML.from_string(document)
from contextlib import contextmanager
@contextmanager
def mock_xades_policy():
from mock import patch
import os.path
with patch('xades.policy.urllib.urlopen') as mock:
class UrllibPolicyMock:
def read(self):
# Usamos contenido de archivo local
cur_dir = os.path.dirname(os.path.abspath(__file__))
data_dir = os.path.join(cur_dir, 'data', 'dian')
policy_file = os.path.join(data_dir, 'politicadefirmav2.pdf')
with open(policy_file, 'rb') as f:
return f.read()
mock.return_value = UrllibPolicyMock()
yield
class FeXML(FachoXML):
def __init__(self, root, namespace):
super().__init__("{%s}%s" % (namespace, root),
nsmap=NAMESPACES)
@classmethod
def from_string(cls, document: str) -> 'FeXML':
return super().from_string(document, namespaces=NAMESPACES)
def tostring(self, **kw):
# MACHETE(bit4bit) la DIAN espera que la etiqueta raiz no este en un namespace
root_namespace = self.root_namespace()
xmlns_name = {v: k for k, v in NAMESPACES.items()}[root_namespace]
return super().tostring(**kw)\
.replace(xmlns_name + ':', '')\
.replace('xmlns:'+xmlns_name, 'xmlns')
class DianXMLExtensionCUDFE(FachoXMLExtension):
def __init__(self, invoice, tipo_ambiente = AMBIENTE_PRUEBAS):
self.tipo_ambiente = tipo_ambiente
self.invoice = invoice
def _tipo_ambiente_int(self):
return int(self.tipo_ambiente)
def formatVars(self, invoice):
raise NotImplementedError()
def schemeName(self):
raise NotImplementedError()
def _get_qrcode(self, cufe):
url_for = {
AMBIENTE_PRUEBAS: 'https://catalogo-vpfe-hab.dian.gov.co/document/searchqr?documentkey=',
AMBIENTE_PRODUCCION: 'https://catalogo-vpfe.dian.gov.co/document/searchqr?documentkey='
}
return url_for[self.tipo_ambiente] + cufe
def build(self, fachoxml):
cufe = self._generate_cufe()
fachoxml.set_element('./cbc:UUID', cufe,
schemeID=self.tipo_ambiente,
schemeName=self.schemeName())
#DIAN 1.8.-2021: FAD03
fachoxml.set_element('./cbc:ProfileID', 'DIAN 2.1: Factura Electrónica de Venta')
fachoxml.set_element('./cbc:ProfileExecutionID', self._tipo_ambiente_int())
#DIAN 1.7.-2020: FAB36
fachoxml.set_element('./ext:UBLExtensions/ext:UBLExtension/ext:ExtensionContent/sts:DianExtensions/sts:QRCode',
self._get_qrcode(cufe))
def issue_time(self, datetime_):
return datetime_.strftime('%H:%M:%S-05:00')
def issue_date(self, datetime_):
return datetime_.strftime('%Y-%m-%d')
def buildVars(self):
invoice = self.invoice
build_vars = {}
build_vars['NumFac'] = invoice.invoice_ident
build_vars['FecFac'] = self.issue_date(invoice.invoice_issue)
build_vars['HoraFac'] = self.issue_time(invoice.invoice_issue)
# PAG 601
build_vars['ValorBruto'] = invoice.invoice_legal_monetary_total.line_extension_amount
build_vars['ValorTotalPagar'] = invoice.invoice_legal_monetary_total.payable_amount
ValorImpuestoPara = defaultdict(lambda: form.Amount(0.0))
build_vars['CodImpuesto1'] = '01'
build_vars['CodImpuesto2'] = '04'
build_vars['CodImpuesto3'] = '03'
for invoice_line in invoice.invoice_lines:
for subtotal in invoice_line.tax.subtotals:
if subtotal.scheme is not None:
# TODO cual es la naturaleza de tax_scheme_ident?
codigo_impuesto = subtotal.scheme.code
ValorImpuestoPara.setdefault(codigo_impuesto, form.Amount(0.0))
ValorImpuestoPara[codigo_impuesto] += subtotal.tax_amount
build_vars['ValorImpuestoPara'] = ValorImpuestoPara
build_vars['NitOFE'] = invoice.invoice_supplier.ident
build_vars['NumAdq'] = invoice.invoice_customer.ident
build_vars['TipoAmb'] = self._tipo_ambiente_int()
return build_vars
def _generate_cufe(self):
cufe = "".join(self.formatVars())
# crear hash...
h = hashlib.sha384()
h.update(cufe.encode('utf-8'))
return h.hexdigest()
class DianXMLExtensionCUFE(DianXMLExtensionCUDFE):
def __init__(self, invoice, clave_tecnica = '', tipo_ambiente = AMBIENTE_PRUEBAS):
self.tipo_ambiente = tipo_ambiente
self.clave_tecnica = clave_tecnica
self.invoice = invoice
def schemeName(self):
return 'CUFE-SHA384'
def buildVars(self):
build_vars = super().buildVars()
build_vars['ClTec'] = str(self.clave_tecnica)
return build_vars
def formatVars(self):
build_vars = self.buildVars()
CodImpuesto1 = build_vars['CodImpuesto1']
CodImpuesto2 = build_vars['CodImpuesto2']
CodImpuesto3 = build_vars['CodImpuesto3']
return [
'%s' % build_vars['NumFac'],
'%s' % build_vars['FecFac'],
'%s' % build_vars['HoraFac'],
form.Amount(build_vars['ValorBruto']).truncate_as_string(2),
CodImpuesto1,
build_vars['ValorImpuestoPara'].get(CodImpuesto1, form.Amount(0.0)).truncate_as_string(2),
CodImpuesto2,
build_vars['ValorImpuestoPara'].get(CodImpuesto2, form.Amount(0.0)).truncate_as_string(2),
CodImpuesto3,
build_vars['ValorImpuestoPara'].get(CodImpuesto3, form.Amount(0.0)).truncate_as_string(2),
build_vars['ValorTotalPagar'].truncate_as_string(2),
'%s' % build_vars['NitOFE'],
'%s' % build_vars['NumAdq'],
'%s' % build_vars['ClTec'],
'%d' % build_vars['TipoAmb'],
]
class DianXMLExtensionCUDE(DianXMLExtensionCUDFE):
def __init__(self, invoice, software_pin, tipo_ambiente = AMBIENTE_PRUEBAS):
self.tipo_ambiente = tipo_ambiente
self.software_pin = software_pin
self.invoice = invoice
def schemeName(self):
return 'CUDE-SHA384'
def buildVars(self):
build_vars = super().buildVars()
build_vars['Software-PIN'] = str(self.software_pin)
return build_vars
def formatVars(self):
build_vars = self.buildVars()
CodImpuesto1 = build_vars['CodImpuesto1']
CodImpuesto2 = build_vars['CodImpuesto2']
CodImpuesto3 = build_vars['CodImpuesto3']
return [
'%s' % build_vars['NumFac'],
'%s' % build_vars['FecFac'],
'%s' % build_vars['HoraFac'],
form.Amount(build_vars['ValorBruto']).truncate_as_string(2),
CodImpuesto1,
form.Amount(build_vars['ValorImpuestoPara'].get(CodImpuesto1, 0.0)).truncate_as_string(2),
CodImpuesto2,
form.Amount(build_vars['ValorImpuestoPara'].get(CodImpuesto2, 0.0)).truncate_as_string(2),
CodImpuesto3,
form.Amount(build_vars['ValorImpuestoPara'].get(CodImpuesto3, 0.0)).truncate_as_string(2),
form.Amount(build_vars['ValorTotalPagar']).truncate_as_string(2),
'%s' % build_vars['NitOFE'],
'%s' % build_vars['NumAdq'],
'%s' % build_vars['Software-PIN'],
'%d' % build_vars['TipoAmb'],
]
class DianXMLExtensionSoftwareProvider(FachoXMLExtension):
# RESOLUCION 0004: pagina 108
def __init__(self, nit, dv, id_software: str):
self.nit = nit
self.dv = dv
self.id_software = id_software
def build(self, fexml):
software_provider = fexml.fragment('./ext:UBLExtensions/ext:UBLExtension/ext:ExtensionContent/sts:DianExtensions/sts:SoftwareProvider')
provider_id_attrs = SCHEME_AGENCY_ATTRS.copy()
provider_id_attrs.update({'schemeID': self.dv})
#DIAN 1.7.-2020: FAB23
provider_id_attrs.update({'schemeName': '31'})
software_provider.set_element('/sts:SoftwareProvider/sts:ProviderID', self.nit,
**provider_id_attrs)
software_provider.set_element('/sts:SoftwareProvider/sts:SoftwareID', self.id_software,
**SCHEME_AGENCY_ATTRS)
class DianXMLExtensionSoftwareSecurityCode(FachoXMLExtension):
# RESOLUCION 0001: pagina 535
def __init__(self, id_software: str, pin: str, invoice_ident: str):
self.id_software = id_software
self.pin = pin
self.invoice_ident = invoice_ident
def build(self, fexml):
dian_path = './ext:UBLExtensions/ext:UBLExtension/ext:ExtensionContent/sts:DianExtensions/sts:SoftwareSecurityCode'
code = str(self.id_software) + str(self.pin) + str(self.invoice_ident)
m = hashlib.sha384()
m.update(code.encode('utf-8'))
fexml.set_element(dian_path, m.hexdigest())
fexml.set_attributes(dian_path, **SCHEME_AGENCY_ATTRS)
return '', []
class DianXMLExtensionSigner:
def __init__(self, pkcs12_path, passphrase=None, localpolicy=True):
self._pkcs12_data = open(pkcs12_path, 'rb').read()
self._passphrase = None
self._localpolicy = localpolicy
if passphrase:
self._passphrase = passphrase.encode('utf-8')
@classmethod
def from_bytes(cls, data, passphrase=None, localpolicy=True):
self = cls.__new__(cls)
self._pkcs12_data = data
self._passphrase = None
self._localpolicy = localpolicy
if passphrase:
self._passphrase = passphrase.encode('utf-8')
return self
def _element_extension_content(self, fachoxml):
return fachoxml.builder.xpath(fachoxml.root, './ext:UBLExtensions/ext:UBLExtension[2]/ext:ExtensionContent')
def sign_xml_string(self, document):
xml = LXMLBuilder.from_string(document)
signature = self.sign_xml_element(xml)
fachoxml = FachoXML(xml,nsmap=NAMESPACES)
#DIAN 1.7.-2020: FAB01
extcontent = self._element_extension_content(fachoxml)
fachoxml.append_element(extcontent, signature)
return fachoxml.tostring(xml_declaration=True, encoding='UTF-8')
def sign_xml_element(self, xml):
id_uuid = str(uuid.uuid4())
signature = xmlsig.template.create(
xmlsig.constants.TransformInclC14N,
xmlsig.constants.TransformRsaSha256,
"xmlsig-%s" % (id_uuid),
)
xml.append(signature)
ref = xmlsig.template.add_reference(
signature, xmlsig.constants.TransformSha256, uri="", name="xmldsig-%s-ref0" % (id_uuid)
)
xmlsig.template.add_transform(ref, xmlsig.constants.TransformEnveloped)
id_keyinfo = "xmldsig-%s-KeyInfo" % (id_uuid)
xmlsig.template.add_reference(
signature, xmlsig.constants.TransformSha256, uri="#%s" % (id_keyinfo), name="xmldsig-%s-ref1" % (id_uuid),
)
ki = xmlsig.template.ensure_key_info(signature, name=id_keyinfo)
data = xmlsig.template.add_x509_data(ki)
xmlsig.template.x509_data_add_certificate(data)
xmlsig.template.add_key_value(ki)
qualifying = xades.template.create_qualifying_properties(signature, 'XadesObjects', 'xades')
xades.utils.ensure_id(qualifying)
id_props = "xmldsig-%s-signedprops" % (id_uuid)
props_ref = xmlsig.template.add_reference(
signature, xmlsig.constants.TransformSha256, uri="#%s" % (id_props),
uri_type="http://uri.etsi.org/01903#SignedProperties"
)
xmlsig.template.add_transform(props_ref, xmlsig.constants.TransformInclC14N)
# TODO assert with http://www.sic.gov.co/hora-legal-colombiana
props = xades.template.create_signed_properties(qualifying, name=id_props, datetime=datetime.now())
xades.template.add_claimed_role(props, "supplier")
policy = xades.policy.GenericPolicyId(
POLICY_ID,
POLICY_NAME,
xmlsig.constants.TransformSha256)
ctx = xades.XAdESContext(policy)
ctx.load_pkcs12(OpenSSL.crypto.load_pkcs12(self._pkcs12_data,
self._passphrase))
if self._localpolicy:
with mock_xades_policy():
ctx.sign(signature)
ctx.verify(signature)
else:
ctx.sign(signature)
ctx.verify(signature)
#xmlsig take parent root
xml.remove(signature)
return signature
def build(self, fachoxml):
signature = self.sign_xml_element(fachoxml.root)
extcontent = self._element_extension_content(fachoxml)
fachoxml.append_element(extcontent, signature)
class DianXMLExtensionAuthorizationProvider(FachoXMLExtension):
# RESOLUCION 0004: pagina 176
def build(self, fexml):
attrs = {'schemeID': '4', 'schemeName': '31'}
attrs.update(SCHEME_AGENCY_ATTRS)
authorization_provider = fexml.fragment('./ext:UBLExtensions/ext:UBLExtension/ext:ExtensionContent/sts:DianExtensions/sts:AuthorizationProvider')
authorization_provider.set_element('./sts:AuthorizationProviderID',
'800197268',
**attrs)
class DianXMLExtensionInvoiceSource(FachoXMLExtension):
# CAB13
def build(self, fexml):
dian_path = '/fe:CreditNote/ext:UBLExtensions/ext:UBLExtension/ext:ExtensionContent/sts:DianExtensions/sts:InvoiceSource/cbc:IdentificationCode'
fexml.set_element(dian_path, 'CO',
listAgencyID="6",
listAgencyName="United Nations Economic Commission for Europe",
listSchemeURI="urn:oasis:names:specification:ubl:codelist:gc:CountryIdentificationCode-2.1")
class DianXMLExtensionInvoiceAuthorization(FachoXMLExtension):
# RESOLUCION 0004: pagina 106
def __init__(self, authorization: str,
period_startdate: datetime, period_enddate: datetime,
prefix: str, from_: int, to: int):
self.authorization = authorization
self.period_startdate = period_startdate
self.period_enddate = period_enddate
self.prefix = prefix
self.from_ = from_
self.to = to
def build(self, fexml):
invoice_control = fexml.fragment('./ext:UBLExtensions/ext:UBLExtension/ext:ExtensionContent/sts:DianExtensions/sts:InvoiceControl')
invoice_control.set_element('/sts:InvoiceControl/sts:InvoiceAuthorization', self.authorization)
invoice_control.set_element('/sts:InvoiceControl/sts:AuthorizationPeriod/cbc:StartDate',
self.period_startdate.strftime('%Y-%m-%d'))
invoice_control.set_element('/sts:InvoiceControl/sts:AuthorizationPeriod/cbc:EndDate',
self.period_enddate.strftime('%Y-%m-%d'))
invoice_control.set_element('/sts:InvoiceControl/sts:AuthorizedInvoices/sts:Prefix',
self.prefix)
invoice_control.set_element('/sts:InvoiceControl/sts:AuthorizedInvoices/sts:From',
self.from_)
invoice_control.set_element('/sts:InvoiceControl/sts:AuthorizedInvoices/sts:To',
self.to)
fexml.set_element('./ext:UBLExtensions/ext:UBLExtension/ext:ExtensionContent/sts:DianExtensions/sts:InvoiceSource/cbc:IdentificationCode',
'CO',
#DIAN 1.7.-2020: FAB15
listAgencyID="6",
#DIAN 1.7.-2020: FAB16
listAgencyName="United Nations Economic Commission for Europe",
#DIAN 1.7.-2020: FAB17
listSchemeURI="urn:oasis:names:specification:ubl:codelist:gc:CountryIdentificationCode-2.1"
)
class DianZIP:
# RESOLUCION 0001: pagina 540
MAX_FILES = 50
def __init__(self, file_like):
self.zipfile = zipfile.ZipFile(file_like, mode='w', compression=zipfile.ZIP_DEFLATED)
self.num_files = 0
def add_xml(self, name, xml_data):
self.num_files += 1
# TODO cual es la norma para los nombres de archivos?
m = hashlib.sha256()
m.update(name.encode('utf-8'))
filename = m.hexdigest() + '.xml'
with self.zipfile.open(filename, 'w') as fp:
fp.write(xml_data.encode('utf-8'))
return filename
# DEPRECATED usar add_xml
def add_invoice_xml(self, name, xml_data):
return self.add_xml(name, xml_data)
def __enter__(self):
"""
Facilita el uso de esta manera:
f = open('xxx', 'rb')
with DianZIP(f) as zip:
zip.add_invoice_xml('name', 'data xml')
"""
return self
def __exit__(self, type, value, traceback):
return self.zipfile.close()
class DianXMLExtensionSignerVerifier:
def __init__(self, pkcs12_path_or_bytes, passphrase=None, localpolicy=True):
self._pkcs12_path_or_bytes = pkcs12_path_or_bytes
self._passphrase = None
self._localpolicy = localpolicy
if passphrase:
self._passphrase = passphrase.encode('utf-8')
def verify_string(self, document):
# Obtener FachoXML
xml = LXMLBuilder.from_string(document)
fachoxml = FachoXML(xml,nsmap=NAMESPACES)
# Obtener Signature
signature = fachoxml.builder.xpath(fachoxml.root, '//ds:Signature')
assert signature is not None
# Se mueve Signature a elemento raiz para realizar verificaion
signature.getparent().remove(signature)
fachoxml.root.append(signature)
# Verificar archivo usando Signature
pkcs12_data = self._pkcs12_path_or_bytes
if isinstance(self._pkcs12_path_or_bytes, str):
pkcs12_data = open(self._pkcs12_path_or_bytes, 'rb').read()
ctx = xades.XAdESContext()
ctx.load_pkcs12(OpenSSL.crypto.load_pkcs12(pkcs12_data,
self._passphrase))
try:
if self._localpolicy:
with mock_xades_policy():
ctx.verify(signature)
else:
ctx.verify(signature)
return True
except:
return False