diff --git a/facho/cli.py b/facho/cli.py
index d9a56e6..9a41b40 100644
--- a/facho/cli.py
+++ b/facho/cli.py
@@ -1,4 +1,6 @@
import sys
+import base64
+
import click
import logging.config
@@ -45,18 +47,40 @@ def consultaResolucionesFacturacion(nit, nit_proveedor, id_software, username, p
@click.command()
@click.option('--private-key', required=True)
@click.option('--public-key', required=True)
+@click.option('--habilitacion/--produccion', default=False)
@click.option('--password')
@click.argument('filename', required=True)
@click.argument('zipfile', type=click.Path(exists=True))
-def SendTestSetAsync(private_key, public_key, password, filename, zipfile):
+def soap_send_bill_sync(private_key, public_key, habilitacion, password, filename, zipfile):
from facho.fe.client import dian
client = dian.DianSignatureClient(private_key, public_key, password=password)
- resp = client.request(dian.SendTestSetAsync(
- filename, open(zipfile, 'r').read().encode('utf-8')
+ req = dian.SendBillSync
+ if habilitacion:
+ req = dian.Habilitacion.SendBillSync
+ resp = client.request(req(
+ filename,
+ open(zipfile, 'rb').read()
))
print(resp)
+@click.command()
+@click.option('--private-key', required=True)
+@click.option('--public-key', required=True)
+@click.option('--habilitacion/--produccion', default=False)
+@click.option('--password')
+@click.option('--track-id', required=True)
+def soap_get_status(private_key, public_key, habilitacion, password, track_id):
+ from facho.fe.client import dian
+
+ client = dian.DianSignatureClient(private_key, public_key, password=password)
+ req = dian.GetStatus
+ if habilitacion:
+ req = dian.Habilitacion.GetStatus
+ resp = client.request(req(
+ trackId = track_id
+ ))
+ print(resp)
@click.command()
@click.option('--private-key', type=click.Path(exists=True))
@@ -97,5 +121,6 @@ def main():
pass
main.add_command(consultaResolucionesFacturacion)
-main.add_command(SendTestSetAsync)
+main.add_command(soap_send_bill_sync)
+main.add_command(soap_get_status)
main.add_command(generate_invoice)
diff --git a/facho/fe/client/dian.py b/facho/fe/client/dian.py
index bbfbc44..448a1d4 100644
--- a/facho/fe/client/dian.py
+++ b/facho/fe/client/dian.py
@@ -2,8 +2,9 @@ from facho import facho
import zeep
from zeep.wsse.username import UsernameToken
-from zeep.wsse.signature import Signature
-
+from .wsse.signature import Signature, BinarySignature
+from zeep.wsa import WsAddressingPlugin
+import xmlsec
import urllib.request
from datetime import datetime
from dataclasses import dataclass, asdict, field
@@ -13,6 +14,8 @@ import hashlib
import secrets
import base64
+from . import zeep_plugins
+
__all__ = ['DianClient',
'ConsultaResolucionesFacturacionPeticion',
'ConsultaResolucionesFacturacionRespuesta']
@@ -91,10 +94,12 @@ class SendBillAsync:
return {}
+
@dataclass
-class SendTestSetAsync:
+class SendTestSetAsync(SOAPService):
fileName: str
contentFile: str
+ testSetId: str = ''
def get_wsdl(self):
return 'https://colombia-dian-webservices-input-sbx.azurewebsites.net/WcfDianCustomerServices.svc?wsdl'
@@ -105,6 +110,50 @@ class SendTestSetAsync:
def build_response(self, as_dict):
return {}
+@dataclass
+class SendBillSync(SOAPService):
+ fileName: str
+ contentFile: bytes
+
+ def get_wsdl(self):
+ return 'https://colombia-dian-webservices-input-sbx.azurewebsites.net/WcfDianCustomerServices.svc?wsdl'
+
+ def get_service(self):
+ return 'SendBillSync'
+
+ def build_response(self, as_dict):
+ return {}
+
+
+@dataclass
+class GetStatus(SOAPService):
+ trackId: bytes
+
+ def get_wsdl(self):
+ return 'https://colombia-dian-webservices-input-sbx.azurewebsites.net/WcfDianCustomerServices.svc?wsdl'
+
+ def get_service(self):
+ return 'GetStatus'
+
+ def build_response(self, as_dict):
+ return {}
+
+
+class Habilitacion:
+ WSDL = 'https://vpfe-hab.dian.gov.co/WcfDianCustomerServices.svc?wsdl'
+
+ class SendBillSync(SendBillSync):
+ def get_wsdl(self):
+ return Habilitacion.WSDL
+
+ class SendTestSetAsync(SendTestSetAsync):
+ def get_wsdl(self):
+ return Habilitacion.WSDL
+
+ class GetStatus(GetStatus):
+ def get_wsdl(self):
+ return Habilitacion.WSDL
+
class DianGateway:
@@ -147,7 +196,17 @@ class DianSignatureClient(DianGateway):
self.password = password
def _open(self, service):
- return zeep.Client(service.get_wsdl(), wsse=Signature(
- self.private_key_path, self.public_key_path, self.password))
+ # RESOLUCCION 0004: pagina 756
+ from zeep.wsse import utils
+
+ client = zeep.Client(service.get_wsdl(), wsse=
+ [
+ BinarySignature(
+ self.private_key_path, self.public_key_path, self.password,
+ signature_method=xmlsec.Transform.RSA_SHA256,
+ digest_method=xmlsec.Transform.SHA256)
+ ],
+ )
+ return client
diff --git a/facho/fe/client/wsse/__init__.py b/facho/fe/client/wsse/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/facho/fe/client/wsse/signature.py b/facho/fe/client/wsse/signature.py
new file mode 100644
index 0000000..be90c2f
--- /dev/null
+++ b/facho/fe/client/wsse/signature.py
@@ -0,0 +1,401 @@
+"""Functions for WS-Security (WSSE) signature creation and verification.
+
+Heavily based on test examples in https://github.com/mehcode/python-xmlsec as
+well as the xmlsec documentation at https://www.aleksey.com/xmlsec/.
+
+Reading the xmldsig, xmlenc, and ws-security standards documents, though
+admittedly painful, will likely assist in understanding the code in this
+module.
+
+"""
+import pytz
+from datetime import datetime, timedelta
+from lxml import etree
+from lxml.etree import QName
+
+from zeep import ns
+from zeep.exceptions import SignatureVerificationFailed
+from zeep.utils import detect_soap_env
+from zeep.wsdl.utils import get_or_create_header
+from zeep.wsse.utils import ensure_id, get_security_header
+from zeep.wsse import utils
+
+try:
+ import xmlsec
+except ImportError:
+ xmlsec = None
+
+
+# SOAP envelope
+SOAP_NS = "http://schemas.xmlsoap.org/soap/envelope/"
+
+
+def _read_file(f_name):
+ with open(f_name, "rb") as f:
+ return f.read()
+
+
+def _make_sign_key(key_data, cert_data, password):
+ key = xmlsec.Key.from_memory(key_data, xmlsec.KeyFormat.PEM, password)
+ key.load_cert_from_memory(cert_data, xmlsec.KeyFormat.PEM)
+ return key
+
+
+def _make_verify_key(cert_data):
+ key = xmlsec.Key.from_memory(cert_data, xmlsec.KeyFormat.CERT_PEM, None)
+ return key
+
+
+class MemorySignature(object):
+ """Sign given SOAP envelope with WSSE sig using given key and cert."""
+
+ def __init__(
+ self,
+ key_data,
+ cert_data,
+ password=None,
+ signature_method=None,
+ digest_method=None,
+ expires_dt=None
+ ):
+ check_xmlsec_import()
+
+ self.key_data = key_data
+ self.cert_data = cert_data
+ self.password = password
+ self.digest_method = digest_method
+ self.signature_method = signature_method
+ self.expires_dt = expires_dt
+
+ def apply(self, envelope, headers):
+ key = _make_sign_key(self.key_data, self.cert_data, self.password)
+ _sign_envelope_with_key(
+ envelope, key, self.signature_method, self.digest_method, expires_dt=self.expires_dt
+ )
+ return envelope, headers
+
+ def verify(self, envelope):
+ key = _make_verify_key(self.cert_data)
+ _verify_envelope_with_key(envelope, key)
+ return envelope
+
+
+class Signature(MemorySignature):
+ """Sign given SOAP envelope with WSSE sig using given key file and cert file."""
+
+ def __init__(
+ self,
+ key_file,
+ certfile,
+ password=None,
+ signature_method=None,
+ digest_method=None,
+ ):
+ super(Signature, self).__init__(
+ _read_file(key_file),
+ _read_file(certfile),
+ password,
+ signature_method,
+ digest_method,
+ )
+
+
+class BinarySignature(Signature):
+ """Sign given SOAP envelope with WSSE sig using given key file and cert file.
+
+ Place the key information into BinarySecurityElement."""
+
+ def apply(self, envelope, headers):
+ key = _make_sign_key(self.key_data, self.cert_data, self.password)
+ _sign_envelope_with_key_binary(
+ envelope, key, self.signature_method, self.digest_method, expires_dt = self.expires_dt
+ )
+ return envelope, headers
+
+
+def check_xmlsec_import():
+ if xmlsec is None:
+ raise ImportError(
+ "The xmlsec module is required for wsse.Signature()\n"
+ + "You can install xmlsec with: pip install xmlsec\n"
+ + "or install zeep via: pip install zeep[xmlsec]\n"
+ )
+
+
+def sign_envelope(
+ envelope,
+ keyfile,
+ certfile,
+ password=None,
+ signature_method=None,
+ digest_method=None,
+):
+ """Sign given SOAP envelope with WSSE sig using given key and cert.
+
+ Sign the wsu:Timestamp node in the wsse:Security header and the soap:Body;
+ both must be present.
+
+ Add a ds:Signature node in the wsse:Security header containing the
+ signature.
+
+ Use EXCL-C14N transforms to normalize the signed XML (so that irrelevant
+ whitespace or attribute ordering changes don't invalidate the
+ signature). Use SHA1 signatures.
+
+ Expects to sign an incoming document something like this (xmlns attributes
+ omitted for readability):
+
+
+
+
+
+ 2015-06-25T21:53:25.246276+00:00
+ 2015-06-25T21:58:25.246276+00:00
+
+
+
+
+ ...
+
+
+
+ After signing, the sample document would look something like this (note the
+ added wsu:Id attr on the soap:Body and wsu:Timestamp nodes, and the added
+ ds:Signature node in the header, with ds:Reference nodes with URI attribute
+ referencing the wsu:Id of the signed nodes):
+
+
+
+
+
+
+
+
+
+
+
+
+
+ nnjjqTKxwl1hT/2RUsBuszgjTbI=
+
+
+
+
+
+
+ qAATZaSqAr9fta9ApbGrFWDuCCQ=
+
+
+ Hz8jtQb...bOdT6ZdTQ==
+
+
+
+ MIIDnzC...Ia2qKQ==
+
+ ...
+ ...
+
+
+
+
+
+
+ 2015-06-25T22:00:29.821700+00:00
+ 2015-06-25T22:05:29.821700+00:00
+
+
+
+
+ ...
+
+
+
+ """
+ # Load the signing key and certificate.
+ key = _make_sign_key(_read_file(keyfile), _read_file(certfile), password)
+ return _sign_envelope_with_key(envelope, key, signature_method, digest_method)
+
+def get_timestamp(timestamp = None, delta=None):
+ timestamp = timestamp or datetime.utcnow()
+ if delta:
+ timestamp += delta
+
+ format_ = '%Y-%m-%dT%H:%M:%SZ'
+ timestamp = timestamp.replace(tzinfo=pytz.utc, microsecond=0)
+ return timestamp.strftime(format_)
+
+def _append_timestamp(security, expires_dt=None):
+ if expires_dt is None:
+ expires_dt = timedelta(seconds=6000)
+
+ etimestamp = utils.WSU.Timestamp({'{http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd}Id': utils.get_unique_id()})
+ etimestamp.append(utils.WSU.Created(get_timestamp()))
+ etimestamp.append(utils.WSU.Expires(get_timestamp(delta=expires_dt)))
+ security.insert(0, etimestamp)
+ if etree.LXML_VERSION[:2] >= (3, 5):
+ etree.cleanup_namespaces(security,
+ keep_ns_prefixes = security.nsmap,
+ top_nsmap=utils.NSMAP)
+ else:
+ etree.cleanup_namespaces(header)
+
+def _signature_prepare(envelope, key, signature_method, digest_method, expires_dt=None):
+ """Prepare envelope and sign."""
+ soap_env = detect_soap_env(envelope)
+
+ # Create the Signature node.
+ signature = xmlsec.template.create(
+ envelope,
+ xmlsec.Transform.EXCL_C14N,
+ signature_method or xmlsec.Transform.RSA_SHA1,
+ )
+
+ # Add a KeyInfo node with X509Data child to the Signature. XMLSec will fill
+ # in this template with the actual certificate details when it signs.
+ key_info = xmlsec.template.ensure_key_info(signature)
+ x509_data = xmlsec.template.add_x509_data(key_info)
+ xmlsec.template.x509_data_add_issuer_serial(x509_data)
+ xmlsec.template.x509_data_add_certificate(x509_data)
+
+ # Insert the Signature node in the wsse:Security header.
+ security = get_security_header(envelope)
+ security.insert(0, signature)
+
+ # Perform the actual signing.
+ ctx = xmlsec.SignatureContext()
+ ctx.key = key
+
+ header = get_or_create_header(envelope)
+
+ # DIAN
+ _sign_node(ctx, signature, header.find(QName(ns.WSA, "To")), digest_method)
+ _append_timestamp(security, expires_dt=expires_dt)
+
+ timestamp = security.find(QName(ns.WSU, "Timestamp"))
+ if timestamp != None:
+ _sign_node(ctx, signature, timestamp, digest_method)
+ ctx.sign(signature)
+
+ # Place the X509 data inside a WSSE SecurityTokenReference within
+ # KeyInfo. The recipient expects this structure, but we can't rearrange
+ # like this until after signing, because otherwise xmlsec won't populate
+ # the X509 data (because it doesn't understand WSSE).
+ sec_token_ref = etree.SubElement(key_info, QName(ns.WSSE, "SecurityTokenReference"))
+ return security, sec_token_ref, x509_data
+
+
+def _sign_envelope_with_key(envelope, key, signature_method, digest_method, expires_dt=None):
+ _, sec_token_ref, x509_data = _signature_prepare(
+ envelope, key, signature_method, digest_method, expires_dt=expires_dt
+ )
+ sec_token_ref.append(x509_data)
+
+
+def _sign_envelope_with_key_binary(envelope, key, signature_method, digest_method, expires_dt=None):
+ security, sec_token_ref, x509_data = _signature_prepare(
+ envelope, key, signature_method, digest_method, expires_dt=expires_dt
+ )
+ ref = etree.SubElement(
+ sec_token_ref,
+ QName(ns.WSSE, "Reference"),
+ {
+ "ValueType": "http://docs.oasis-open.org/wss/2004/01/"
+ "oasis-200401-wss-x509-token-profile-1.0#X509v3"
+ },
+ )
+ bintok = etree.Element(
+ QName(ns.WSSE, "BinarySecurityToken"),
+ {
+ "ValueType": "http://docs.oasis-open.org/wss/2004/01/"
+ "oasis-200401-wss-x509-token-profile-1.0#X509v3",
+ "EncodingType": "http://docs.oasis-open.org/wss/2004/01/"
+ "oasis-200401-wss-soap-message-security-1.0#Base64Binary",
+ },
+ )
+ ref.attrib["URI"] = "#" + ensure_id(bintok)
+ bintok.text = x509_data.find(QName(ns.DS, "X509Certificate")).text
+ security.insert(1, bintok)
+ x509_data.getparent().remove(x509_data)
+
+
+def verify_envelope(envelope, certfile):
+ """Verify WS-Security signature on given SOAP envelope with given cert.
+
+ Expects a document like that found in the sample XML in the ``sign()``
+ docstring.
+
+ Raise SignatureVerificationFailed on failure, silent on success.
+
+ """
+ key = _make_verify_key(_read_file(certfile))
+ return _verify_envelope_with_key(envelope, key)
+
+
+def _verify_envelope_with_key(envelope, key):
+ soap_env = detect_soap_env(envelope)
+
+ header = envelope.find(QName(soap_env, "Header"))
+ if header is None:
+ raise SignatureVerificationFailed()
+
+ security = header.find(QName(ns.WSSE, "Security"))
+ signature = security.find(QName(ns.DS, "Signature"))
+
+ ctx = xmlsec.SignatureContext()
+
+ # Find each signed element and register its ID with the signing context.
+ refs = signature.xpath("ds:SignedInfo/ds:Reference", namespaces={"ds": ns.DS})
+ for ref in refs:
+ # Get the reference URI and cut off the initial '#'
+ referenced_id = ref.get("URI")[1:]
+ referenced = envelope.xpath(
+ "//*[@wsu:Id='%s']" % referenced_id, namespaces={"wsu": ns.WSU}
+ )[0]
+ ctx.register_id(referenced, "Id", ns.WSU)
+
+ ctx.key = key
+
+ try:
+ ctx.verify(signature)
+ except xmlsec.Error:
+ # Sadly xmlsec gives us no details about the reason for the failure, so
+ # we have nothing to pass on except that verification failed.
+ raise SignatureVerificationFailed()
+
+
+def _sign_node(ctx, signature, target, digest_method=None):
+ """Add sig for ``target`` in ``signature`` node, using ``ctx`` context.
+
+ Doesn't actually perform the signing; ``ctx.sign(signature)`` should be
+ called later to do that.
+
+ Adds a Reference node to the signature with URI attribute pointing to the
+ target node, and registers the target node's ID so XMLSec will be able to
+ find the target node by ID when it signs.
+
+ """
+
+ # Ensure the target node has a wsu:Id attribute and get its value.
+ node_id = ensure_id(target)
+
+ # Unlike HTML, XML doesn't have a single standardized Id. WSSE suggests the
+ # use of the wsu:Id attribute for this purpose, but XMLSec doesn't
+ # understand that natively. So for XMLSec to be able to find the referenced
+ # node by id, we have to tell xmlsec about it using the register_id method.
+ ctx.register_id(target, "Id", ns.WSU)
+
+ # Add reference to signature with URI attribute pointing to that ID.
+ ref = xmlsec.template.add_reference(
+ signature, digest_method or xmlsec.Transform.SHA1, uri="#" + node_id
+ )
+ # This is an XML normalization transform which will be performed on the
+ # target node contents before signing. This ensures that changes to
+ # irrelevant whitespace, attribute ordering, etc won't invalidate the
+ # signature.
+ xmlsec.template.add_transform(ref, xmlsec.Transform.EXCL_C14N)