oc_template/modules/account_co_co/account.py

159 lines
5.4 KiB
Python
Raw Normal View History

2024-06-15 11:00:00 -05:00
from sql.aggregate import Sum
from sql.functions import Abs, Round
from sql.operators import Exists
from trytond.pool import Pool, PoolMeta
from trytond.model.exceptions import AccessError
from .exceptions import PostError
from trytond.i18n import gettext
from trytond.model import ModelView
from decimal import Decimal
from itertools import groupby
from trytond.transaction import Transaction
from trytond.tools import reduce_ids, grouped_slice
__all__ = ['Account', 'Move', 'Line']
class Account(metaclass=PoolMeta):
__name__ = 'account.account'
@classmethod
def __setup__(cls):
super(Account, cls).__setup__()
cls.party_required.domain = [()]
cls.party_required.states = {}
class Move(metaclass=PoolMeta):
__name__ = 'account.move'
@classmethod
def create(cls, vlist):
pool = Pool()
Journal = pool.get('account.journal')
context = Transaction().context
journals = {}
default_company = cls.default_company()
vlist = [x.copy() for x in vlist]
for vals in vlist:
if not vals.get('number'):
journal_id = vals.get('journal', context.get('journal'))
company_id = vals.get('company', default_company)
if journal_id:
if journal_id not in journals:
journal = journals[journal_id] = Journal(journal_id)
else:
journal = journals[journal_id]
sequence = journal.get_multivalue(
'sequence', company=company_id)
if sequence:
with Transaction().set_context(company=company_id):
vals['number'] = sequence.get()
return super().create(vlist)
@classmethod
@ModelView.button
def post(cls, moves):
pool = Pool()
Date = pool.get('ir.date')
Line = pool.get('account.move.line')
move = cls.__table__()
line = Line.__table__()
cursor = Transaction().connection.cursor()
to_reconcile = []
for company, c_moves in groupby(moves, lambda m: m.company):
currency = company.currency
for sub_moves in grouped_slice(list(c_moves)):
sub_moves_ids = [m.id for m in sub_moves]
cursor.execute(*move.select(
move.id,
where=reduce_ids(move.id, sub_moves_ids)
& ~Exists(line.select(
line.move,
where=line.move == move.id))))
try:
move_id, = cursor.fetchone()
except TypeError:
pass
else:
raise PostError(
gettext('account.msg_post_empty_move',
move=cls(move_id).rec_name))
cursor.execute(*line.select(
line.move,
where=reduce_ids(line.move, sub_moves_ids),
group_by=line.move,
having=Abs(Round(
Sum(line.debit - line.credit),
currency.digits)) >= abs(currency.rounding)))
try:
move_id, = cursor.fetchone()
except TypeError:
pass
else:
raise PostError(
gettext('account.msg_post_unbalanced_move',
move=cls(move_id).rec_name))
cursor.execute(*line.select(
line.id,
where=reduce_ids(line.move, sub_moves_ids)
& (line.debit == Decimal(0))
& (line.credit == Decimal(0))))
to_reconcile.extend(l for l, in cursor)
for move in moves:
move.state = 'posted'
if not move.post_number:
with Transaction().set_context(company=move.company.id):
move.post_date = Date.today()
move.post_number = move.period.post_move_sequence_used.get()
def keyfunc(line):
return line.party, line.account
to_reconcile = Line.browse(sorted(
[l for l in Line.browse(to_reconcile) if l.account.reconcile],
key=keyfunc))
for _, lines in groupby(to_reconcile, keyfunc):
Line.reconcile(list(lines))
cls.save(moves)
class Line(metaclass=PoolMeta):
__name__ = 'account.move.line'
@classmethod
def __setup__(cls):
super(Line, cls).__setup__()
cls.party.states = {}
@classmethod
def check_account(cls, lines, field_names=None):
if field_names and not (field_names & {'account', 'party'}):
return
for line in lines:
if not line.account.type or line.account.closed:
raise AccessError(
gettext('account.msg_line_closed_account',
account=line.account.rec_name))
if line.account.party_required:
if bool(line.party) != bool(line.account.party_required):
error = 'party_set' if line.party else 'party_required'
raise AccessError(
gettext('account.msg_line_%s' % error,
account=line.account.rec_name,
line=line.rec_name))