Skip to content

Instantly share code, notes, and snippets.

@nseinlet
Created May 6, 2019 07:37
Show Gist options
  • Save nseinlet/133863f0e1eae8fd190e03efb420f595 to your computer and use it in GitHub Desktop.
Save nseinlet/133863f0e1eae8fd190e03efb420f595 to your computer and use it in GitHub Desktop.
#!/usr/bin/python3
# coding: utf-8
import odoolib
import sys
import random
import threading
from datetime import datetime, date
def closing(username):
connection = odoolib.get_connection(hostname="localhost", database="12hlc", login=username, password=username, port=80)
config_model = connection.get_model('pos.config')
session_model = connection.get_model('pos.session')
order_model = connection.get_model('pos.order')
# Search config
config_id = config_model.search([('name', 'ilike', pos_id)])
if len(config_id) > 1:
config_id = config_id[0]
# Open session
config_model.open_session_cb(config_id)
config_model.open_session_cb(config_id)
pos_config = config_model.read(config_id, ['current_session_id', 'journal_ids', 'pricelist_id'])[0]
session_id = pos_config['current_session_id'][0]
order_ids = order_model.search([['session_id', '=', session_id]])
t1 = datetime.now()
session_model.action_pos_session_closing_control(session_id)
t2 = datetime.now()
print("%s closed session %s in %s seconds (%s orders)" % (username, session_id, (t2-t1).seconds, len(order_ids)))
def selling(username, size):
connection = odoolib.get_connection(hostname="localhost", database="12hlc", login=username, password=username, port=80)
config_model = connection.get_model('pos.config')
order_model = connection.get_model('pos.order')
product_model = connection.get_model("product.product")
abs_model = connection.get_model('account.bank.statement')
# Search config
config_id = config_model.search([('name', 'ilike', pos_id)])
if len(config_id) > 1:
config_id = config_id[0]
# Open session
config_model.open_session_cb(config_id)
config_model.open_session_cb(config_id)
pos_config = config_model.read(config_id, ['current_session_id', 'journal_ids', 'pricelist_id'])[0]
session_id = pos_config['current_session_id'][0]
pricelist_id = pos_config['pricelist_id'][0]
product_ids = product_model.search([['sale_ok', '=', True], ['available_in_pos', '=', True]])
abs_ids = abs_model.search_read([['state', '=', 'open'], ['pos_session_id', '=', session_id]], ['account_id','journal_id'])
for i in range(0, size):
uid = '%s_%s' % (i, datetime.now().isoformat())
payment_method = random.choice(abs_ids)
order_data = {
'amount_paid': 55000,
'amount_return': 0,
'amount_total': 55000,
'amount_tax': 5000,
'creation_date': datetime.now().isoformat(),
'customer_count': 1,
'fiscal_position_id': False,
'floor': False,
'floor_id': False,
'loyalty_points': 0,
'name': 'order_%s' % uid,
'partner_id': False,
'pos_session_id': session_id,
'pricelist_id': pricelist_id,
'sequence_number': i,
'table_id': False,
'uid': uid,
'user_id': False,
'lines': [[0, 0, {
'discount': 0,
'id': 1,
# 'note': "",
'pack_lot_ids': [],
'price_subtotal': 50000,
'price_subtotal_incl': 55000,
'price_unit': 55000,
'product_id': random.choice(product_ids),
'qty': 1,
# 'sale_advisor': 1046,
'tax_ids': [[6, 0, [4]]]
}]],
'statement_ids': [[0, 0, {
'account_id': payment_method['account_id'][0],
'amount': 55000,
'journal_id': payment_method['journal_id'][0],
'name': date.today().strftime("%Y-%m-%d"),
'statement_id': payment_method['id'],
}]]
}
res = order_model.create_from_ui([{
'id': uid,
'data': order_data,
'to_invoice': False
}])
print(username, res)
if __name__ == "__main__":
if len(sys.argv) < 3:
print("Usage : lt_pos.py concurrency task")
print("")
print("Ex : lt_pos.py 3 sell")
print(" lt_pos.py 4 close")
print(" To choose the number of pos order to create:")
print(" lt_pos.py 4 sell 1000")
else:
concurrency = int(sys.argv[1])
lst_thd = []
for i in range(5, concurrency+5):
pos_id = "00000%s" % i
pos_id = pos_id[-5:]
username = 'sm%s@mydb.com' % pos_id
if sys.argv[2] == "sell":
sizing = 1000
if len(sys.argv) > 3:
sizing = int(sys.argv[3])
thd = threading.Thread(target=selling, args=(username, sizing))
else:
thd = threading.Thread(target=closing, args=(username,))
thd.start()
lst_thd.append(thd)
for thd in lst_thd:
thd.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment