Created
January 3, 2020 10:06
-
-
Save curious-codr/f2ef04438b3f332f4420c490ffe01d03 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
string = """{ | |
"IMAGE_SOURCE_DIR": "documents_runtime", | |
"TEMPLATE_FOLDER": "documents_metadata", | |
"NORTHWELL_ENDPOINT" : 'https://us-central1-springmldemoproject.cloudfunctions.net/hello_gcs', | |
"PDF_TO_IMAGE_ENDPOINT" : 'https://us-central1-springmldemoproject.cloudfunctions.net/pdf_to_image', | |
"TEMPLATE_PROCESSOR_MAP" : {'northwell_template': 'NorthwellProcessor()', 'realestate_template': 'RealEstateProcessor()', 'cbre_template':'CBREProcessor()', 'gg_bearings_template':'GGBearingsProcessor()','MouDocuments':'CBREMOUProcessor()','RegusDocuments':'CBREESUITERegusProcessor()','WeworkDocuments':'CBREESUITEWeworkProcessor()', 'test':'CBREIncomeLeaseProcessor()'}, | |
"DEFAULT_PROCESSOR" : "DocumentProcessor", | |
"IMAGE_DESTINATION_DIR" : 'invoice_documents_processed', | |
"DOCUMENT_KEY": "document_template" | |
}""" | |
import json | |
temp = json.loads(string) | |
class AppConstants: | |
DOCUMENT_KEY= temp.get("DOCUMENT_KEY", "document_template") | |
IMAGE_SOURCE_DIR = temp.get("IMAGE_SOURCE_DIR", "documents_runtime") | |
IMAGE_DESTINATION_DIR = temp.get("IMAGE_DESTINATION_DIR", "invoice_documents_processed") | |
TEMPLATE_FOLDER = temp.get("TEMPLATE_FOLDER", "documents_metadata") | |
NORTHWELL_ENDPOINT = temp.get("NORTHWELL_ENDPOINT", "https://us-central1-springmldemoproject.cloudfunctions.net/hello_gcs") | |
PDF_TO_IMAGE_ENDPOINT = temp.get("PDF_TO_IMAGE_ENDPOINT","https://us-central1-springmldemoproject.cloudfunctions.net/pdf_to_image") | |
TEMPLATE_PROCESSOR_MAP = temp.get() {'northwell_template': 'NorthwellProcessor()', 'realestate_template': 'RealEstateProcessor()', 'cbre_template':'CBREProcessor()', 'gg_bearings_template':'GGBearingsProcessor()','MouDocuments':'CBREMOUProcessor()','RegusDocuments':'CBREESUITERegusProcessor()','WeworkDocuments':'CBREESUITEWeworkProcessor()', 'test':'CBREIncomeLeaseProcessor()'} | |
DEFAULT_PROCESSOR = temp.get("DEFAULT_PROCESSOR", "DocumentProcessor") | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
from flask import Blueprint, request, \ | |
url_for, jsonify | |
from flask_restplus import Resource, Api, fields, reqparse, inputs | |
from werkzeug.datastructures import FileStorage | |
from src.doc_types.DocType import DocType | |
from templatestore import storage | |
from google.api_core.exceptions import NotFound | |
from datetime import datetime | |
from src.utils.Constants import Constants | |
from src.utils.other_utils import convert_image_format | |
from io import BytesIO | |
class MyApi(Api): | |
@property | |
def specs_url(self): | |
"""Monkey patch for HTTPS""" | |
scheme = 'http' if '127.0.0.1' in self.base_url or 'localhost' in self.base_url else 'https' | |
return url_for(self.endpoint('specs'), _external=True, _scheme=scheme) | |
crud = Blueprint('crud', __name__) | |
# api = Api(crud) | |
api = MyApi(crud, title='Document Extraction') | |
entity = api.model('entity', {'keyword': fields.String(), 'keyword_type': fields.String( | |
), 'aliases': fields.List(fields.String)}) | |
invoice_keyword_aliases = api.model("document_template", { | |
'keyword_mapping': fields.List(fields.Nested(entity))}) | |
template_model = api.model( | |
'template', {"document_template": fields.Nested(invoice_keyword_aliases)}) | |
file_upload = reqparse.RequestParser() | |
file_upload.add_argument('image', type=FileStorage, location='files') | |
file_upload.add_argument('username', location='headers') | |
file_upload.add_argument('show_table', type= inputs.boolean,location='form') | |
header_parser = reqparse.RequestParser() | |
header_parser.add_argument('username', location='headers', default='test') | |
@api.route('/templates') | |
class Templates(Resource): | |
@api.expect(header_parser) | |
def get(self): | |
prefix = get_username(header_parser) | |
# args = header_parser.parse_args() | |
# prefix = args['username'] | |
templates = [] | |
blobs = storage.list_blobs_with_prefix(prefix + '/', '/') | |
for blob in blobs: | |
name = str(blob.name).split('/')[2] | |
if name: | |
templates.append(name.split('.json')[0]) | |
return jsonify(templates) | |
@api.route('/template/<string:template_name>') | |
class Template(Resource): | |
@api.expect(header_parser) | |
def get(self, template_name): | |
prefix = get_username(header_parser) | |
# args = header_parser.parse_args() | |
# prefix = args['username'] | |
template_name = prefix + '/' + template_name + '.json' | |
template_json = storage.read_file(template_name) | |
return json.loads(template_json) | |
@api.expect(template_model) | |
# @api.expect(header_parser) | |
def post(self, template_name): | |
prefix = get_username(header_parser) | |
# args = header_parser.parse_args() | |
# prefix = args['username'] | |
template_name = prefix + '/' + template_name + '.json' | |
json_data = request.json | |
json_data['template_name'] = template_name.split('.')[0] | |
json_str = json.dumps(json_data) | |
data = storage.upload_file( | |
f'{template_name}', json_str, 'application/json') | |
return {'success': True}, 200, {'ContentType': 'application/json'} | |
@api.expect(header_parser) | |
def delete(self, template_name): | |
prefix = get_username(header_parser) | |
# args = header_parser.parse_args() | |
# prefix = args['username'] | |
template_name = prefix + '/' + template_name + '.json' | |
storage.delete_file(template_name) | |
return {'success': True}, 200, {'ContentType': 'application/json'} | |
@api.route('/template/test_json/<string:template_name>') | |
class TestTemplate2(Resource): | |
def post(self, template_name): | |
import base64 | |
import io | |
""" | |
Upload image with base64 format and save file stream | |
""" | |
if not request.content_type == 'application/json': | |
return jsonify({'error': 'Content-type must be application/json!', 'status_code':401}) | |
print(request.content_type) | |
data = request.json | |
# print(data) | |
if data is None: | |
return jsonify({'error': 'No valid request body, json missing!'}) | |
else: | |
img_data = data['image'] | |
filename = data['filename'] | |
user_name= data['username'] | |
prefix = user_name.lower() | |
filename = filename.replace(' ', '-').replace('#','').lower() | |
image = base64.b64decode(img_data[img_data.find(',')+1:]) | |
in_memory_image = io.BytesIO(image) | |
# this method convert and save the base64 string to image | |
# filename = self.convert_and_save(img_data, filename) | |
# saving in memory image to gcs bucket | |
img_public_url = storage.upload_from_string( | |
prefix + '/' + filename, in_memory_image.getvalue()) | |
print(img_public_url) | |
doc = DocType() | |
extractTables = False | |
print(extractTables) | |
result = doc.process(template_name, filename, prefix=prefix, extractTables=extractTables) | |
print("RESULT from crud.py", result) | |
return jsonify(result) | |
@api.route('/template/admin/test') | |
class AdminPage(Resource): | |
def get(self): | |
print(request.content_type) | |
from src.utils.AppConstants import AppConstants | |
temp_json = src.utils.AppConstants.temp | |
print(temp_json) | |
print("Admin page loading ...") | |
return jsonify(temp_json) | |
@api.route('/template/test/<string:template_name>') | |
class TestTemplate(Resource): | |
@api.expect(file_upload) | |
def post(self, template_name): | |
print(request.content_type) | |
prefix = get_username(file_upload) | |
args = file_upload.parse_args() | |
image = args['image'] | |
img_name = image.filename.replace(' ', '-').replace('#','').lower() | |
ext = img_name.split('.')[-1] | |
img_name = ".".join(img_name.split('.')[:-1]).replace('.','') | |
img_name = f'{img_name}.{ext}' | |
print("IMAGENAME:",img_name," image.filename:",image.filename) | |
content_type = '' | |
image = image.stream.read() | |
try: | |
img_public_url = storage.upload_from_string( | |
prefix + '/' + img_name, image) | |
print("PUBLIC:",img_public_url) | |
except Exception as e: | |
print("Upload error") | |
print(e) | |
pass | |
doc = DocType() | |
extractTables = args['show_table'] | |
print(extractTables) | |
ext = ".jpg" | |
if '.png' in img_name: | |
ext = ".png" | |
result = doc.process(template_name, img_name, prefix=prefix, extractTables=extractTables) | |
outputImage = None | |
try: | |
boundingBoxBlob = storage.read_file_from_bucket(f'{prefix}/boundaries_{str(img_name).replace(".pdf",ext)}',Constants.IMAGE_SOURCE_DIR) | |
import base64 | |
outputImage = base64.encodebytes(boundingBoxBlob).decode('ascii') | |
except Exception as e: | |
print("Couldn't find boundary file") | |
print(e) | |
pass | |
print("RESULT from crud.py", result) | |
return {"result":result,"outputImage": outputImage} | |
# if '.png' in img_name: | |
# ext = ".png" | |
# result = doc.process(template_name, img_name, prefix=prefix, extractTables=extractTables) | |
# boundingBoxBlob = storage.read_file_from_bucket(f'{prefix}/boundaries_{str(img_name).replace(".pdf",ext)}',Constants.IMAGE_SOURCE_DIR) | |
# import base64 | |
# print("RESULT from crud.py", result) | |
# return {"result":result,"outputImage":base64.encodebytes(boundingBoxBlob).decode('ascii')} | |
@api.route('/template/import/<string:template_name>') | |
class Template(Resource): | |
@api.expect(header_parser) | |
def get(self, template_name): | |
prefix = get_username(header_parser) | |
generic_template = f'generic/templates/{template_name}.json' | |
d = datetime.utcnow() | |
try: | |
template_json = storage.read_file(generic_template) | |
except NotFound: | |
return {'error': 'Template not found'}, 404, {'ContentType': 'application/json'} | |
template_name = prefix + '/' + template_name + \ | |
"-" + d.strftime("%Y%m%d%H%M%S") + '.json' | |
storage.upload_file( | |
f'{template_name}', template_json, 'application/json') | |
return {'success': True}, 200, {'ContentType': 'application/json'} | |
@api.route('/templates/generic') | |
class Templates(Resource): | |
def get(self): | |
prefix = 'generic' + "/" + "templates" | |
templates = [] | |
blobs = storage.list_blobs_with_prefix(prefix + '/', '/') | |
for blob in blobs: | |
name = str(blob.name).split('/')[2] | |
if name: | |
templates.append(name.split('.json')[0]) | |
return jsonify(templates) | |
@api.route('/templates/types') | |
class TemplateTypes(Resource): | |
@api.expect(header_parser) | |
def get(self): | |
args = header_parser.parse_args() | |
username = args.username.lower() | |
prefix = username + "/" + Constants.SAMPLE_DOCS | |
templates = [] | |
try: | |
blobs = storage.list_blobs_with_prefix_from_bucket(prefix,None, Constants.TEMPLATE_FOLDER) | |
for blob in blobs: | |
name = blob.name | |
if not name.endswith("/") and 'default' not in name: | |
templates.append(name.split("/")[-2]) | |
except: | |
templates = ['UB-04-form','birth-certificate','drivers-license','invoice','passport','w2-form'] | |
return jsonify(list(set(templates))) | |
def add_template_hints(template_name, advanced_fields): | |
# Append advanced fields : hints in template | |
# hints: [{'keyword' :'label', 'hint':'[nearest, top, right, below]'}, {'label': 'nearest'}, {['label':'top']}] | |
hints = advanced_fields['add_hints'] | |
template_json = storage.read_file(template_name) | |
template = json.loads(template_json) | |
keyword_mapping = template[Constants.DOCUMENT_KEY]['keyword_mapping'] | |
for key in keyword_mapping: | |
for hint in hints: | |
if key['keyword']==hint['keyword']: | |
key['hint'] = hint['hint'] | |
return template | |
def get_username(headers): | |
args = headers.parse_args() | |
return args.username.lower() + "/" + "templates" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment