Commit 43421951 authored by Arnaud Fontaine's avatar Arnaud Fontaine

erp5_accounting_l10n_fr: FEC Report: Allow to group by ledger or...

erp5_accounting_l10n_fr: FEC Report: Allow to group by ledger or ledger+portal_type in addition as per portal_type (the default).

This required refactoring the source files as the implementation was
portal_type-centric. This adds a dedicated Python Script with ProxyRole=Manager
to create Active Processes and thus avoid executing code as Manager as much as
possible.
parent b84f5e00
from Products.CMFActivity.ActiveResult import ActiveResult
portal = context.getPortalObject()
portal_type = portal.portal_types[portal_type]
active_process = portal.restrictedTraverse(active_process)
this_portal_type_active_process = portal.restrictedTraverse(this_portal_type_active_process)
this_journal_active_process = portal.restrictedTraverse(this_journal_active_process)
# XXX we need proxy role for this
result_list = this_portal_type_active_process.getResultList()
result_list = this_journal_active_process.getResultList()
if result_list:
journal_fragment = context.AccountingTransactionModule_viewJournalAsFECXML(
portal_type=portal_type,
journal_code=journal_code,
journal_lib=journal_lib,
result_list=result_list)
active_process.postResult(ActiveResult(detail=journal_fragment.encode('utf8').encode('zlib')))
# delete no longer needed active process
this_portal_type_active_process.getParentValue().manage_delObjects(ids=[this_portal_type_active_process.getId()])
this_journal_active_process.getParentValue().manage_delObjects(ids=[this_journal_active_process.getId()])
......@@ -50,7 +50,7 @@
</item>
<item>
<key> <string>_params</string> </key>
<value> <string>portal_type, active_process, this_portal_type_active_process</string> </value>
<value> <string>journal_code, journal_lib, active_process, this_journal_active_process</string> </value>
</item>
<item>
<key> <string>_proxy_roles</string> </key>
......@@ -62,7 +62,7 @@
</item>
<item>
<key> <string>id</string> </key>
<value> <string>AccountingTransactionModule_aggregateFrenchAccountingTransactionFileForPortalType</string> </value>
<value> <string>AccountingTransactionModule_aggregateFrenchAccountingTransactionFileForOneJournal</string> </value>
</item>
</dictionary>
</pickle>
......
return context.getPortalObject().portal_activities.newActiveProcess()
......@@ -50,11 +50,19 @@
</item>
<item>
<key> <string>_params</string> </key>
<value> <string>portal_type, section_uid_list, from_date, at_date, simulation_state, ledger, active_process, this_portal_type_active_process, tag, aggregate_tag, priority,</string> </value>
<value> <string></string> </value>
</item>
<item>
<key> <string>_proxy_roles</string> </key>
<value>
<tuple>
<string>Manager</string>
</tuple>
</value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>AccountingTransactionModule_viewFrenchAccountingTransactionFileForPortalType</string> </value>
<value> <string>AccountingTransactionModule_createActiveProcessForFrenchAccountingTransactionFile</string> </value>
</item>
</dictionary>
</pickle>
......
from Products.ERP5Type.Message import translateString
portal = context.getPortalObject()
if group_by in ('ledger', 'portal_type_ledger') and ledger is None:
return context.Base_redirect('view', keep_items=dict(
portal_status_message=translateString("At least one Ledger must be selected")))
person_value = portal.portal_membership.getAuthenticatedMember().getUserValue()
if person_value is None:
portal.changeSkin(None)
......@@ -15,11 +19,11 @@ if portal.portal_activities.countMessageWithTag(tag) or \
return context.Base_redirect(form_id, keep_items=dict(
portal_status_message=translateString("Report already in progress.")))
context.activate().AccountingTransactionModule_viewFrenchAccountingTransactionFileActive(
section_category,
section_category_strict,
at_date,
group_by,
simulation_state,
ledger,
user_name=person_value.Person_getUserId(),
......
......@@ -50,7 +50,7 @@
</item>
<item>
<key> <string>_params</string> </key>
<value> <string>section_category, section_category_strict, at_date, simulation_state, ledger=None, form_id=None, **kw</string> </value>
<value> <string>section_category, section_category_strict, at_date, simulation_state, group_by=\'portal_type\', ledger=None, form_id=None, **kw</string> </value>
</item>
<item>
<key> <string>id</string> </key>
......
......@@ -7,26 +7,94 @@ section_uid_list = portal.Base_getSectionUidListForSectionCategory(
from_date = portal.Base_getAccountingPeriodStartDateForSectionCategory(
section_category, at_date)
# XXX we need proxy role for that
active_process = portal.portal_activities.newActiveProcess()
ledger_obj_list = []
if ledger is not None:
if not (isinstance(ledger, list) or isinstance(ledger, tuple)):
ledger = (ledger,)
category_tool = portal.portal_categories
for item in ledger:
ledger_obj_list.append(category_tool.ledger.restrictedTraverse(item))
elif group_by in ('ledger', 'portal_type_ledger'):
raise ValueError("At least one Ledger is needed for group_by=%r" % group_by)
def _groupedJournalTupleDict():
portal_type_list = portal.getPortalAccountingTransactionTypeList()
default_search_kw = {
'simulation_state': simulation_state,
'accounting_transaction.section_uid': section_uid_list,
'operation_date': {'query': (from_date, at_date), 'range': 'minngt' },
}
journal_tuple_list = []
if group_by == 'ledger':
default_search_kw['portal_type'] = portal_type_list
for ledger_obj in ledger_obj_list:
journal_code = journal_lib = ledger_obj.getReference(ledger_obj.getId())
ledger_search_kw = default_search_kw.copy()
ledger_search_kw['default_ledger_uid'] = ledger_obj.getUid()
journal_tuple_list.append((journal_code, journal_lib, ledger_search_kw))
elif group_by == 'portal_type_ledger':
for ledger_obj in ledger_obj_list:
for portal_type in portal_type_list:
portal_type_obj = portal.portal_types[portal_type]
ledger_reference = ledger_obj.getReference(ledger_obj.getId())
journal_code = "%s: %s" % (portal_type_obj.getCompactTranslatedTitle(), ledger_reference)
journal_lib = "%s: %s" % (portal_type_obj.getTranslatedTitle(), ledger_reference)
portal_type_ledger_search_kw = default_search_kw.copy()
portal_type_ledger_search_kw['default_ledger_uid'] = ledger_obj.getUid()
portal_type_ledger_search_kw['portal_type'] = portal_type
journal_tuple_list.append((journal_code, journal_lib, portal_type_ledger_search_kw))
# group_by == 'portal_type' (Default)
else:
if ledger_obj_list:
default_search_kw['default_ledger_uid'] = [ ledger_obj.getUid() for ledger_obj in ledger_obj_list ]
for portal_type in portal_type_list:
portal_type_obj = portal.portal_types[portal_type]
journal_code = portal_type_obj.getCompactTranslatedTitle()
journal_lib = portal_type_obj.getTranslatedTitle()
portal_type_search_kw = default_search_kw.copy()
portal_type_search_kw['portal_type'] = portal_type
journal_tuple_list.append((journal_code, journal_lib, portal_type_search_kw))
return journal_tuple_list
priority = 4
# Proxy Role needed to create an 'Active Process'
active_process = context.AccountingTransactionModule_createActiveProcessForFrenchAccountingTransactionFile()
if tag is None:
tag = script.getId()
if aggregate_tag is None:
aggregate_tag = '%s:aggregate' % tag
for journal_code, journal_lib, search_kw in _groupedJournalTupleDict():
# This script is executed with Proxy Role, required to create 'Active Process'
this_journal_active_process = context.AccountingTransactionModule_createActiveProcessForFrenchAccountingTransactionFile()
portal.portal_catalog.searchAndActivate(
method_id='AccountingTransaction_postFECResult',
method_kw=dict(section_uid_list=section_uid_list, active_process=this_journal_active_process.getRelativeUrl()),
activate_kw=dict(tag=tag, priority=priority),
**search_kw)
for portal_type in portal.getPortalAccountingTransactionTypeList():
# XXX we need proxy role for that
this_portal_type_active_process = portal.portal_activities.newActiveProcess()
context.AccountingTransactionModule_viewFrenchAccountingTransactionFileForPortalType(
portal_type,
section_uid_list,
from_date,
at_date,
simulation_state,
ledger,
active_process.getRelativeUrl(),
this_portal_type_active_process.getRelativeUrl(),
tag,
aggregate_tag,
priority)
context.activate(
tag=aggregate_tag,
after_tag=tag,
activity='SQLQueue').AccountingTransactionModule_aggregateFrenchAccountingTransactionFileForOneJournal(
journal_code,
journal_lib,
active_process=active_process.getRelativeUrl(),
# Proxy Role needed to create an 'Active Process'
this_journal_active_process=this_journal_active_process.getRelativeUrl())
context.activate(after_tag=(tag, aggregate_tag)).AccountingTransactionModule_aggregateFrenchAccountingTransactionFile(
at_date,
......
......@@ -50,15 +50,7 @@
</item>
<item>
<key> <string>_params</string> </key>
<value> <string>section_category, section_category_strict, at_date, simulation_state, ledger, user_name, tag, aggregate_tag, **kw</string> </value>
</item>
<item>
<key> <string>_proxy_roles</string> </key>
<value>
<tuple>
<string>Manager</string>
</tuple>
</value>
<value> <string>section_category, section_category_strict, at_date, group_by, simulation_state, ledger, user_name=None, tag=None, aggregate_tag=None, **kw</string> </value>
</item>
<item>
<key> <string>id</string> </key>
......
......@@ -97,6 +97,7 @@
<string>your_section_category_strict</string>
<string>your_at_date</string>
<string>your_ledger</string>
<string>your_group_by</string>
</list>
</value>
</item>
......
portal = context.getPortalObject()
portal_categories = portal.portal_categories
search_kw = {
'simulation_state': simulation_state,
'accounting_transaction.section_uid': section_uid_list,
'operation_date': {'query': (from_date, at_date), 'range': 'minngt' },
'portal_type': portal_type,
}
if ledger is not None:
if isinstance(ledger, list) or isinstance(ledger, tuple):
ledger_uid_list = [portal_categories.ledger.restrictedTraverse(item).getUid() for item in ledger]
else:
ledger_uid_list = [portal_categories.ledger.restrictedTraverse(ledger).getUid(), ]
search_kw['default_ledger_uid'] = ledger_uid_list
method_kw = {
'active_process': this_portal_type_active_process,
'section_uid_list': section_uid_list,
}
activate_kw = {
'tag': tag,
'priority': priority,
}
portal.portal_catalog.searchAndActivate(
method_id='AccountingTransaction_postFECResult',
method_kw=method_kw,
activate_kw=activate_kw,
**search_kw)
context.activate(tag=aggregate_tag, after_tag=tag, activity='SQLQueue').AccountingTransactionModule_aggregateFrenchAccountingTransactionFileForPortalType(
portal_type=portal_type,
active_process=active_process,
this_portal_type_active_process=this_portal_type_active_process)
<journal xmlns:tal="http://xml.zope.org/namespaces/tal">
<JournalCode tal:content="options/portal_type/getCompactTranslatedTitle"></JournalCode>
<JournalLib tal:content="options/portal_type/getTranslatedTitle"></JournalLib>
<JournalCode tal:content="options/journal_code"></JournalCode>
<JournalLib tal:content="options/journal_lib"></JournalLib>
<tal:block tal:repeat="result options/result_list"><tal:block tal:replace="structure python: result.detail.decode('zlib')"/></tal:block>
</journal>
\ No newline at end of file
......@@ -161,7 +161,7 @@ class TestAccounting_l10n_fr(AccountingTestCase):
self.assertEqual(6, len(credit_list))
self.assertEqual(372, sum([float(x.text) for x in credit_list]))
def _FECWithLedger(self, ledger_list=None):
def _FECWithLedger(self, ledger_list=None, group_by=None):
self.setUpLedger()
account_module = self.portal.account_module
first = self._makeOne(
......@@ -215,6 +215,7 @@ class TestAccounting_l10n_fr(AccountingTestCase):
section_category_strict=False,
at_date=DateTime(2014, 12, 31),
simulation_state=['delivered'],
group_by=group_by,
ledger=ledger_list)
self.tic()
......@@ -249,25 +250,159 @@ class TestAccounting_l10n_fr(AccountingTestCase):
def test_FECWithOneLedger(self):
tree = self._FECWithLedger(['accounting/general'])
debit_list = tree.xpath("//Debit")
# 'Purchase Invoice Transaction' portal_type
journal_list = tree.xpath("//JournalCode[text()='Purchase Invoice Transaction']/..")
self.assertEqual(1, len(journal_list))
journal = journal_list[0]
ecriture_list = sorted([x.text.encode('utf-8') for x in journal.xpath(".//EcritureLib")])
self.assertEqual(['Première Écriture'], ecriture_list)
debit_list = journal.xpath(".//Debit")
self.assertEqual(3, len(debit_list))
self.assertEqual(132, sum([float(x.text) for x in debit_list]))
credit_list = journal.xpath(".//Credit")
self.assertEqual(3, len(credit_list))
self.assertEqual(132, sum([float(x.text) for x in credit_list]))
# 'Sale Invoice Transaction' portal_type
journal_list = tree.xpath("//JournalCode[text()='Sale Invoice Transaction']/..")
self.assertEqual(1, len(journal_list))
journal = journal_list[0]
ecriture_list = sorted([x.text.encode('utf-8') for x in journal.xpath(".//EcritureLib")])
self.assertEqual(['Seconde Écriture'], ecriture_list)
debit_list = journal.xpath(".//Debit")
self.assertEqual(3, len(debit_list))
self.assertEqual(240, sum([float(x.text) for x in debit_list]))
credit_list = journal.xpath(".//Credit")
self.assertEqual(3, len(credit_list))
self.assertEqual(240, sum([float(x.text) for x in credit_list]))
def test_FECWithMultipleLedger(self):
# group_by=portal_type by default
tree = self._FECWithLedger(['accounting/general', 'accounting/detailed'])
# 'Purchase Invoice Transaction' portal_type
journal_list = tree.xpath("//JournalCode[text()='Purchase Invoice Transaction']/..")
self.assertEqual(1, len(journal_list))
journal = journal_list[0]
ecriture_list = sorted([x.text.encode('utf-8') for x in journal.xpath(".//EcritureLib")])
self.assertEqual(['Première Écriture'], ecriture_list)
debit_list = journal.xpath(".//Debit")
self.assertEqual(3, len(debit_list))
self.assertEqual(132, sum([float(x.text) for x in debit_list]))
credit_list = journal.xpath(".//Credit")
self.assertEqual(3, len(credit_list))
self.assertEqual(132, sum([float(x.text) for x in credit_list]))
# 'Sale Invoice Transaction' portal_type
journal_list = tree.xpath("//JournalCode[text()='Sale Invoice Transaction']/..")
self.assertEqual(1, len(journal_list))
journal = journal_list[0]
ecriture_list = sorted([x.text.encode('utf-8') for x in journal.xpath(".//EcritureLib")])
self.assertEqual(['Seconde Écriture', 'Troisième Écriture'], ecriture_list)
debit_list = journal.xpath(".//Debit")
self.assertEqual(6, len(debit_list))
self.assertEqual(425, sum([float(x.text) for x in debit_list]))
credit_list = journal.xpath(".//Credit")
self.assertEqual(6, len(credit_list))
self.assertEqual(425, sum([float(x.text) for x in credit_list]))
def test_FECWithMultipleLedgerGroupByLedger(self):
tree = self._FECWithLedger(['accounting/general', 'accounting/detailed'], group_by='ledger')
# 'accounting/general' ledger
journal_list = tree.xpath("//JournalCode[text()='general']/..")
self.assertEqual(1, len(journal_list))
journal = journal_list[0]
ecriture_list = sorted([x.text.encode('utf-8') for x in journal.xpath(".//EcritureLib")])
self.assertEqual(['Première Écriture', 'Seconde Écriture'], ecriture_list)
debit_list = journal.xpath(".//Debit")
self.assertEqual(6, len(debit_list))
self.assertEqual(372, sum([float(x.text) for x in debit_list]))
credit_list = tree.xpath("//Credit")
credit_list = journal.xpath(".//Credit")
self.assertEqual(6, len(credit_list))
self.assertEqual(372, sum([float(x.text) for x in credit_list]))
# 'accounting/detailed' ledger
journal_list = tree.xpath("//JournalCode[text()='detailed']/..")
self.assertEqual(1, len(journal_list))
journal = journal_list[0]
def test_FECWithMultipleLedger(self):
tree = self._FECWithLedger(['accounting/general', 'accounting/detailed'])
ecriture_list = sorted([x.text.encode('utf-8') for x in journal.xpath(".//EcritureLib")])
self.assertEqual(['Troisième Écriture'], ecriture_list)
debit_list = tree.xpath("//Debit")
self.assertEqual(9, len(debit_list))
self.assertEqual(557, sum([float(x.text) for x in debit_list]))
debit_list = journal.xpath(".//Debit")
self.assertEqual(3, len(debit_list))
self.assertEqual(185, sum([float(x.text) for x in debit_list]))
credit_list = tree.xpath("//Credit")
self.assertEqual(9, len(credit_list))
self.assertEqual(557, sum([float(x.text) for x in credit_list]))
credit_list = journal.xpath(".//Credit")
self.assertEqual(3, len(credit_list))
self.assertEqual(185, sum([float(x.text) for x in credit_list]))
def test_FECWithMultipleLedgerGroupByLedgerAndPortalType(self):
tree = self._FECWithLedger(['accounting/general', 'accounting/detailed'], group_by='portal_type_ledger')
# 'Purchase Invoice Transaction' portal_type and 'accounting/general' ledger
journal_list = tree.xpath("//JournalCode[text()='Purchase Invoice Transaction: general']/..")
self.assertEqual(1, len(journal_list))
journal = journal_list[0]
ecriture_list = sorted([x.text.encode('utf-8') for x in journal.xpath(".//EcritureLib")])
self.assertEqual(['Première Écriture'], ecriture_list)
debit_list = journal.xpath(".//Debit")
self.assertEqual(3, len(debit_list))
self.assertEqual(132, sum([float(x.text) for x in debit_list]))
credit_list = journal.xpath(".//Credit")
self.assertEqual(3, len(credit_list))
self.assertEqual(132, sum([float(x.text) for x in credit_list]))
# 'Sale Invoice Transaction' portal_type and 'accounting/general' ledger
journal_list = tree.xpath("//JournalCode[text()='Sale Invoice Transaction: general']/..")
self.assertEqual(1, len(journal_list))
journal = journal_list[0]
ecriture_list = sorted([x.text.encode('utf-8') for x in journal.xpath(".//EcritureLib")])
self.assertEqual(['Seconde Écriture'], ecriture_list)
debit_list = journal.xpath(".//Debit")
self.assertEqual(3, len(debit_list))
self.assertEqual(240, sum([float(x.text) for x in debit_list]))
credit_list = journal.xpath(".//Credit")
self.assertEqual(3, len(credit_list))
self.assertEqual(240, sum([float(x.text) for x in credit_list]))
# 'Sale Invoice Transaction' portal_type and 'accounting/detailed' ledger
journal_list = tree.xpath("//JournalCode[text()='Sale Invoice Transaction: detailed']/..")
self.assertEqual(1, len(journal_list))
journal = journal_list[0]
ecriture_list = sorted([x.text.encode('utf-8') for x in journal.xpath(".//EcritureLib")])
self.assertEqual(['Troisième Écriture'], ecriture_list)
debit_list = journal.xpath(".//Debit")
self.assertEqual(3, len(debit_list))
self.assertEqual(185, sum([float(x.text) for x in debit_list]))
credit_list = journal.xpath(".//Credit")
self.assertEqual(3, len(credit_list))
self.assertEqual(185, sum([float(x.text) for x in credit_list]))
def test_suite():
suite = unittest.TestSuite()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment