oc_template/modules/account_co_reports/party_book_account.py

285 lines
10 KiB
Python
Raw Permalink Normal View History

2024-06-15 11:00:00 -05:00
# The COPYRIGHT file at the top level of this repository contains the full
# copyright notices and license terms.
from decimal import Decimal
import operator
from trytond.model import fields, ModelView, ModelSQL
from trytond.pool import Pool
from trytond.transaction import Transaction
from trytond.pyson import Eval
from sql import Column, Cast, Literal, Null
from sql.aggregate import Sum, Max, Count
from sql.operators import Concat
from sql.conditionals import Coalesce
# TODO seria mejor PartyLedger?
class PartyBookAccount(ModelSQL, ModelView):
'Party Book Account'
__name__ = 'account_co_reports.party_book_account'
party = fields.Many2One('party.party', 'Party')
company = fields.Many2One('company.company', 'Company')
account = fields.Many2One('account.account', 'Account')
name = fields.Function(fields.Char('Name'),
'get_account', searcher='search_name')
code = fields.Function(fields.Char('Code'),
'get_account')
debit = fields.Function(fields.Numeric('Debit',
digits=(16, Eval('currency_digits', 2)),
depends=['currency_digits']),
'get_credit_debit')
credit = fields.Function(fields.Numeric('Credit',
digits=(16, Eval('currency_digits', 2)),
depends=['currency_digits']),
'get_credit_debit')
balance = fields.Function(fields.Numeric('Balance',
digits=(16, Eval('currency_digits', 2)),
depends=['currency_digits']),
'get_balance')
currency_digits = fields.Function(fields.Integer('Currency Digits'),
'get_currency_digits')
@classmethod
def _combine_column_id(cls, line):
return Cast(Concat(Cast(line.party, 'varchar'),
Cast(line.account, 'varchar')), 'int')
@classmethod
def get_period_ids(cls, name):
pool = Pool()
Period = pool.get('account.period')
context = Transaction().context
period = None
period_ids = []
if name.startswith('start_'):
if context.get('start_period'):
period = Period(context.get('start_period'))
elif name.startswith('end_'):
if context.get('end_period'):
period = Period(context.get('end_period'))
else:
periods = Period.search([
('fiscalyear', '=', context.get('fiscalyear')),
('type', '=', 'standard'),
], order=[('start_date', 'DESC')], limit=1)
if periods:
period, = periods
if period:
periods = Period.search([
('fiscalyear', '=', context.get('fiscalyear')),
('end_date', '<=', period.end_date),
])
if period.start_date == period.end_date:
periods.append(period)
if periods:
period_ids = [p.id for p in periods]
return period_ids
@classmethod
def get_range_period_ids(cls):
start_period_ids = cls.get_period_ids('start_period')
end_period_ids = cls.get_period_ids('end_period')
periods_ids = list(
set(end_period_ids).difference(set(start_period_ids)))
if not periods_ids:
return [-1]
return periods_ids
@classmethod
def _posted_query(cls, table):
if Transaction().context.get('posted'):
return table.state == 'posted'
return Literal(True)
@classmethod
def table_query(cls):
pool = Pool()
context = Transaction().context
Account = pool.get('account.account')
MoveLine = pool.get('account.move.line')
Move = pool.get('account.move')
Party = pool.get('party.party')
line = MoveLine.__table__()
move = Move.__table__()
account = Account.__table__()
columns = [
cls._combine_column_id(line).as_('id'),
Literal(0).as_('create_uid'),
Max(line.create_date).as_('create_date'),
Literal(0).as_('write_uid'),
Max(line.write_date).as_('write_date'),
Count(line.account).as_('total_accounts'),
line.party.as_('party'),
move.company.as_('company'),
line.account.as_('account'),
]
return line.join(move, condition=line.move == move.id) \
.join(account, condition=line.account == account.id) \
.select(*columns,
where=cls._posted_query(move)
& (line.party != Null)
& (account.type != Null)
& (account.closed != Literal(True))
& (move.period.in_(cls.get_range_period_ids()))
& (account.company == context.get('company')),
group_by=(move.company, line.party, line.account))
@classmethod
def get_credit_debit(cls, records, names):
pool = Pool()
context = Transaction().context
Account = pool.get('account.account')
MoveLine = pool.get('account.move.line')
Move = pool.get('account.move')
Party = pool.get('party.party')
ids = [r.id for r in records]
results = {}
account_ids = [r.account.id for r in records]
party_ids = [r.party.id for r in records]
for name in names:
results[name] = dict((i, Decimal(0)) for i in ids)
cursor = Transaction().connection.cursor()
line = MoveLine.__table__()
move = Move.__table__()
account = Account.__table__()
columns = [cls._combine_column_id(line).as_('id')]
for name in names:
columns.append(Sum(Coalesce(Column(line, name), 0)))
cursor.execute(*line.join(move, condition=line.move == move.id) \
.join(account, condition=line.account == account.id) \
.select(*columns,
where=cls._posted_query(move)
& (line.party.in_(party_ids))
& (line.account.in_(account_ids))
& (move.period.in_(cls.get_range_period_ids())),
group_by=(account.company, line.party, line.account)))
for row in cursor.fetchall():
line_id = row[0]
for i, name in enumerate(names, 1):
results[name][line_id] += Decimal(str(row[i]))
return results
@classmethod
def search_name(cls, name, domain):
pool = Pool()
Account = pool.get('account.account')
period_ids = cls.get_period_ids(name)
with Transaction().set_context(periods=period_ids):
accounts = Account.search([], order=[])
_, operator_, operand = domain
operator_ = {
'=': operator.eq,
'>=': operator.ge,
'>': operator.gt,
'<=': operator.le,
'<': operator.lt,
'!=': operator.ne,
'in': lambda v, l: v in l,
'not in': lambda v, l: v not in l,
}.get(operator_, lambda v, l: False)
fname = name
for test in ['start_', 'end_']:
if name.startswith(test):
fname = name[len(test):]
break
ids = [a.id for a in accounts
if operator_(getattr(a, fname), operand)]
return [('id', 'in', ids)]
def get_balance(self, name):
return 0
def get_account(self, name):
return getattr(self.account, name)
@staticmethod
def default_company():
return Transaction().context['company']
def get_currency_digits(self, name):
return self.company.currency.digits
class PartyBookAccountContext(ModelView):
'Party Book Account'
__name__ = 'account_co_reports.party_book_account.context'
fiscalyear = fields.Many2One('account.fiscalyear', 'Fiscal Year',
required=True)
start_period = fields.Many2One('account.period', 'Start Period',
domain=[
('fiscalyear', '=', Eval('fiscalyear')),
], depends=['fiscalyear', 'end_period'])
end_period = fields.Many2One('account.period', 'End Period',
domain=[
('fiscalyear', '=', Eval('fiscalyear')),
], depends=['fiscalyear', 'start_period'])
company = fields.Many2One('company.company', 'Company', required=True)
posted = fields.Boolean('Posted Move', help='Show only posted move')
@classmethod
def default_fiscalyear(cls):
pool = Pool()
FiscalYear = pool.get('account.fiscalyear')
context = Transaction().context
return context.get(
'fiscalyear',
FiscalYear.find(context.get('company'), exception=False))
@classmethod
def default_start_period(cls):
return Transaction().context.get('start_period')
@classmethod
def default_end_period(cls):
return Transaction().context.get('end_period')
@classmethod
def default_company(cls):
return Transaction().context.get('company')
@classmethod
def default_posted(cls):
return Transaction().context.get('posted', False)
@fields.depends('fiscalyear', 'start_period', 'end_period')
def on_change_fiscalyear(self):
if (self.start_period
and self.start_period.fiscalyear != self.fiscalyear):
self.start_period = None
if (self.end_period
and self.end_period.fiscalyear != self.fiscalyear):
self.end_period = None