se cumple con soap dian

FossilOrigin-Name: 186895c8f4bf40f281adea3397956f40ef06ee55980016e617b3d3f3bac7c3ff
This commit is contained in:
2020-05-29 15:33:12 +00:00
parent cfaf13ff8d
commit 8c53f91940
4 changed files with 494 additions and 9 deletions

View File

@@ -2,8 +2,9 @@ from facho import facho
import zeep
from zeep.wsse.username import UsernameToken
from zeep.wsse.signature import Signature
from .wsse.signature import Signature, BinarySignature
from zeep.wsa import WsAddressingPlugin
import xmlsec
import urllib.request
from datetime import datetime
from dataclasses import dataclass, asdict, field
@@ -13,6 +14,8 @@ import hashlib
import secrets
import base64
from . import zeep_plugins
__all__ = ['DianClient',
'ConsultaResolucionesFacturacionPeticion',
'ConsultaResolucionesFacturacionRespuesta']
@@ -91,10 +94,12 @@ class SendBillAsync:
return {}
@dataclass
class SendTestSetAsync:
class SendTestSetAsync(SOAPService):
fileName: str
contentFile: str
testSetId: str = ''
def get_wsdl(self):
return 'https://colombia-dian-webservices-input-sbx.azurewebsites.net/WcfDianCustomerServices.svc?wsdl'
@@ -105,6 +110,50 @@ class SendTestSetAsync:
def build_response(self, as_dict):
return {}
@dataclass
class SendBillSync(SOAPService):
fileName: str
contentFile: bytes
def get_wsdl(self):
return 'https://colombia-dian-webservices-input-sbx.azurewebsites.net/WcfDianCustomerServices.svc?wsdl'
def get_service(self):
return 'SendBillSync'
def build_response(self, as_dict):
return {}
@dataclass
class GetStatus(SOAPService):
trackId: bytes
def get_wsdl(self):
return 'https://colombia-dian-webservices-input-sbx.azurewebsites.net/WcfDianCustomerServices.svc?wsdl'
def get_service(self):
return 'GetStatus'
def build_response(self, as_dict):
return {}
class Habilitacion:
WSDL = 'https://vpfe-hab.dian.gov.co/WcfDianCustomerServices.svc?wsdl'
class SendBillSync(SendBillSync):
def get_wsdl(self):
return Habilitacion.WSDL
class SendTestSetAsync(SendTestSetAsync):
def get_wsdl(self):
return Habilitacion.WSDL
class GetStatus(GetStatus):
def get_wsdl(self):
return Habilitacion.WSDL
class DianGateway:
@@ -147,7 +196,17 @@ class DianSignatureClient(DianGateway):
self.password = password
def _open(self, service):
return zeep.Client(service.get_wsdl(), wsse=Signature(
self.private_key_path, self.public_key_path, self.password))
# RESOLUCCION 0004: pagina 756
from zeep.wsse import utils
client = zeep.Client(service.get_wsdl(), wsse=
[
BinarySignature(
self.private_key_path, self.public_key_path, self.password,
signature_method=xmlsec.Transform.RSA_SHA256,
digest_method=xmlsec.Transform.SHA256)
],
)
return client