-
-
Save dirkakrid/3bb38a759329960ba4db4c70f602c6a3 to your computer and use it in GitHub Desktop.
Python Flask REST server (Works for me on 2.7 & 3.4. 2.7 used to run tests)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from flask import Flask | |
from flask import g, abort, redirect, Response, request | |
import json | |
import sqlite3 | |
import datetime | |
app = Flask(__name__) | |
@app.before_request | |
def db_connect(): | |
g.conn = sqlite3.connect('people.db', detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES) | |
# MySQLdb.connect(host='127.0.0.1', user='test', passwd='password', db='test') | |
g.cursor = g.conn.cursor() | |
@app.after_request | |
def db_disconnect(response): | |
g.conn.commit() | |
g.cursor.close() | |
# g.conn.close() | |
return response | |
def query_db(query, data=(), none=False): | |
#print( json.dumps( data ) ) | |
g.cursor.execute(query, data) | |
result = g.cursor.fetchall() | |
return None if none else result | |
@app.route("/") | |
def index(): | |
obj = { | |
"operations":[ | |
{"method":"GET","pattern":"/","description":"This list of operations"}, | |
{"method":"GET","pattern":"/people","description":"Lists people"}, | |
{"method":"GET","pattern":"/people/id/<int:id>","description":"Retrieves specified person from List"}, | |
{"method":"POST","pattern":"/people","description":"adds person to List"}, | |
{"method":"PUT","pattern":"/people/id/<int:id>","description":"updates person on List"}, | |
{"method":"DELETE","pattern":"/people/id/<int:id>","description":"deletes person from List"}, | |
{"method":"DELETE","pattern":"/people","description":"emptys List"} | |
] | |
} | |
data = json.dumps(obj) | |
return Response(''.join(['{"Message":"Operations List","result":',data,'}']), status=200, mimetype='application/json') | |
@app.route("/people", methods=['GET']) | |
def people(): | |
result = query_db("SELECT * FROM people") | |
data = json.dumps(result) | |
resp = Response(''.join(['{"Message":"Records","results":',data,'}']), status=200, mimetype='application/json') | |
return resp | |
@app.route("/people/id/<int:id>", methods=['GET']) | |
def get_person(id): | |
result = query_db("SELECT * FROM people WHERE id = ?", [str(id)] ) | |
if len( result ) > 0: | |
data = json.dumps(result) | |
resp = Response(''.join(['{"Message":"Record","result":"',data,'"}']), status=200, mimetype='application/json') | |
else: | |
abort(404) | |
return resp | |
@app.route("/people", methods=['POST']) | |
def add_person(): | |
req_json = request.get_json() | |
query_db("INSERT INTO people (firstname, lastname) VALUES (?,?)", [ req_json['firstname'], req_json['lastname'] ], True ) | |
resp = Response('{"Message":"Added"}', status=201, mimetype='application/json') | |
return resp | |
@app.route("/people/id/<int:id>", methods=['PUT']) | |
def update_person(id): | |
req_json = request.get_json() | |
query_db("UPDATE people SET firstname = ?, lastname = ? WHERE id = ?", [req_json['firstname'], req_json['lastname'], str(id) ], True ) | |
resp = Response('{"Message":"Updated"}', status=202, mimetype='application/json') | |
return resp | |
@app.route("/people/id/<int:id>", methods=['DELETE']) | |
def remove_person(id): | |
query_db("DELETE FROM people WHERE id = ?", [str(id)], True ) | |
resp = Response('{"Message":"Deleted"}', status=202, mimetype='application/json') | |
return resp | |
@app.route("/people", methods=['DELETE']) | |
def empty_people(): | |
query_db("DELETE FROM people") | |
query_db("DELETE from sqlite_sequence where name=?", ['people']); # Reset Auto-Increment | |
resp = Response('{"Message":"Deleted All"}', status=202, mimetype='application/json') | |
return resp | |
@app.errorhandler(404) | |
def page_not_found(e): | |
return Response('{"Message":"Resource Does Not Exist"}', status=404, mimetype='application/json') | |
if __name__ == "__main__": | |
app.run(host='0.0.0.0',port=8000,debug=True) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE TABLE `people` ( | |
`id` INTEGER PRIMARY KEY AUTOINCREMENT, | |
`firstname` TEXT, | |
`lastname` TEXT | |
); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
window.CD2 = window.CD2 || {}; | |
window.CD2.utils = window.CD2.utils || {}; | |
window.CD2.utils.form = window.CD2.utils.form || {}; | |
window.CD2.utils.xhr = window.CD2.utils.xhr || {}; | |
window.CD2.utils.xhr.CB = window.CD2.utils.xhr.CB || {}; | |
// | |
// Default callback to manipulate xhr by adding onloadend event handler | |
// | |
window.CD2.utils.xhr.CB.logMsgOnDone = function( ) { | |
var done = function(){ if(this.readyState == this.DONE) { console.log( 'data sent', this ); } }; | |
this.onreadystatechange = done; | |
}; | |
// | |
// ensures xhrCB is valid or default | |
// | |
window.CD2.utils.xhr.ensureXHRCB = function() { | |
if ( typeof xhrCB !== 'function' ) { | |
xhrCB = window.CD2.utils.xhr.CB.logMsgOnDone; | |
} | |
}; | |
// | |
// Send JSON via xhr | |
// | |
window.CD2.utils.xhr.sendJSON = function( data, action, method, xhrCB ) { | |
if ( typeof xhrCB !== 'function' ) { | |
xhrCB = window.CD2.utils.xhr.CB.logMsgOnDone; | |
} | |
// construct an HTTP request | |
var xhr = new XMLHttpRequest(); | |
xhr.open(method, action, true); | |
xhr.setRequestHeader('Content-Type', 'application/json; charset=UTF-8'); | |
// send the collected data as JSON | |
if( data != null ) { | |
xhr.send( data ); | |
} else { | |
xhr.send(); | |
} | |
xhrCB.call( xhr ); | |
}; | |
// | |
// Accepts xhrCB callable so you can operate directly on xhr, by default xhrCB | |
// | |
window.CD2.utils.form.postJSONBodyOnSubmit = function( selector, xhrCB ) { | |
form = document.querySelector( selector ); | |
window.CD2.utils.xhr.ensureXHRCB.apply( this ); | |
if ( obj instanceof HTMLElement ) { | |
form.onsubmit = function (e) { | |
// stop the regular form submission | |
e.preventDefault(); | |
// collect the form data while iterating over the inputs | |
var data = {}; | |
for (var i = 0; i < form.length; i++) { | |
var input = form[i]; | |
if (input.name) { | |
data[input.name] = input.value; | |
} | |
} | |
window.CD2.utils.xhr.sendJSON( | |
JSON.stringify(data), | |
form.action, | |
form.method, | |
xhrCB | |
); | |
}; | |
} | |
}; | |
// window.CD2.utils.xhr.sendJSON( null, 'http://127.0.0.1:8000', 'GET' ); // get service description | |
// window.CD2.utils.xhr.sendJSON( null, 'http://127.0.0.1:8000/', 'GET' ); // get service description | |
// window.CD2.utils.xhr.sendJSON( null, 'http://127.0.0.1:8000/people', 'GET' ); // get list of people | |
// window.CD2.utils.xhr.sendJSON( '{"firstname": "FirstName", "lastname":"LastName"}', 'http://127.0.0.1:8000/people', 'POST' ); // add a new person | |
// window.CD2.utils.xhr.sendJSON( null, 'http://127.0.0.1:8000/people', 'GET' ); // get list of people | |
// window.CD2.utils.xhr.sendJSON( null, 'http://127.0.0.1:8000/people/id/1', 'GET' ); // get specific person | |
// window.CD2.utils.xhr.sendJSON( '{"firstname": "Lewis", "lastname":"Cowles"}', 'http://127.0.0.1:8000/people/id/1', 'PUT' ); // modify a new person | |
// window.CD2.utils.xhr.sendJSON( '{"firstname": "Another", "lastname":"Person"}', 'http://127.0.0.1:8000/people', 'POST' ); // add a new person | |
// window.CD2.utils.xhr.sendJSON( '{"firstname": "Some", "lastname":"Person"}', 'http://127.0.0.1:8000/people', 'POST' ); // add a new person | |
// window.CD2.utils.xhr.sendJSON( null, 'http://127.0.0.1:8000/people', 'GET' ); // get list of people | |
// window.CD2.utils.xhr.sendJSON( null, 'http://127.0.0.1:8000/people/id/2', 'DELETE' ); // delete specific person | |
// window.CD2.utils.xhr.sendJSON( null, 'http://127.0.0.1:8000/people', 'GET' ); // get list of people | |
// window.CD2.utils.xhr.sendJSON( null, 'http://127.0.0.1:8000/people', 'DELETE' ); // delete all people | |
// window.CD2.utils.xhr.sendJSON( null, 'http://127.0.0.1:8000/people', 'GET' ); // get list of people |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import unittest, logging, sys, time, json | |
import app | |
class AppTestCase( unittest.TestCase ): | |
def setUp( self ): | |
#self.db_fd, flaskr.app.config['DATABASE'] = tempfile.mkstemp() | |
app.app.config[ 'TESTING' ] = True | |
self.app = app.app.test_client() | |
self.app.delete( '/people' ) | |
#flaskr.init_db() | |
def tearDown( self ): | |
pass | |
def checkResponse( self, resp, status = '200 OK', mime='application/json' ): | |
#log = logging.getLogger() | |
#log.debug( "%r", resp.status ) | |
return status == resp.status and mime == resp.mimetype | |
def clearCreateMin( self, num ): | |
self.app.delete( '/people' ) # clear record-set | |
target = int( num ) + 1 | |
for i in range( 1, target ): # create at least this many records | |
headers = [ ( 'Content-Type', 'application/json' ) ] | |
data = ''.join( [ '{"firstname": "Lewis', str( i ), '", "lastname":"Cowles', str( i ), '"}' ] ) | |
headers.append( ( 'Content-Length', len( data ) ) ) | |
resp = self.app.post( '/people', data=data, headers=headers ) | |
self.assertTrue( self.checkResponse( resp, '201 CREATED' ) ) | |
def test_non_exist_operations( self ): | |
resp = self.app.get( '/nonexist' ) | |
self.assertIn( '404', resp.status ) | |
def test_get_operations( self ): | |
resp = self.app.get('/') | |
#log = logging.getLogger() | |
#log.debug( "%r", resp.data ) | |
self.assertIn( '{', resp.data ) | |
self.assertIn( '}', resp.data ) | |
self.assertIn( '"Message":"Operations List"', resp.data ) | |
self.assertTrue( self.checkResponse( resp ) ) | |
def test_multi_inserts( self ): | |
for i in range( 1, 26 ): | |
headers = [ ( 'Content-Type', 'application/json' ) ] | |
data = ''.join( [ '{"firstname": "Lewis', str( i ), '", "lastname":"Cowles', str( i ), '"}' ] ) | |
headers.append( ( 'Content-Length', len( data ) ) ) | |
resp = self.app.post( '/people', data=data, headers=headers ) | |
self.assertTrue( self.checkResponse( resp, '201 CREATED' ) ) | |
resp = self.app.get( '/people' ) | |
self.assertTrue( self.checkResponse( resp ) ) | |
self.assertIn( '"Lewis', resp.data ) | |
self.assertIn( '"Cowles', resp.data ) | |
def test_get_single_record_exists( self ): | |
self.clearCreateMin( 3 ) | |
resp = self.app.get( '/people/id/3' ) | |
self.assertTrue( self.checkResponse( resp ) ) | |
self.assertIn( '"Lewis3', resp.data ) | |
self.assertIn( '"Cowles3', resp.data ) | |
def test_update_multi_records( self ): | |
self.clearCreateMin( 11 ) | |
for i in range( 3, 10 ): | |
headers = [ ( 'Content-Type', 'application/json' ) ] | |
data = ''.join( [ '{"firstname": "Updated', str( i ), '", "lastname":"Updaterson', str( i ), '"}' ] ) | |
headers.append( ( 'Content-Length', len( data ) ) ) | |
resp = self.app.put( ''.join(['/people/id/',str(i)]), data=data, headers=headers ) | |
self.assertTrue( self.checkResponse( resp, '202 ACCEPTED' ) ) | |
resp = self.app.get( '/people' ) | |
self.assertTrue( self.checkResponse( resp ) ) | |
self.assertIn( '"Updated3', resp.data ) | |
self.assertIn( '"Updaterson3', resp.data ) | |
self.assertIn( '"Updated4', resp.data ) | |
self.assertIn( '"Updaterson4', resp.data ) | |
self.assertIn( '"Updated5', resp.data ) | |
self.assertIn( '"Updaterson5', resp.data ) | |
self.assertIn( '"Updated6', resp.data ) | |
self.assertIn( '"Updaterson6', resp.data ) | |
self.assertIn( '"Updated7', resp.data ) | |
self.assertIn( '"Updaterson7', resp.data ) | |
self.assertIn( '"Updated8', resp.data ) | |
self.assertIn( '"Updaterson8', resp.data ) | |
self.assertIn( '"Updated9', resp.data ) | |
self.assertIn( '"Updaterson9', resp.data ) | |
def test_get_single_record_noexists( self ): | |
resp = self.app.get( '/people/id/9999' ) | |
self.assertTrue( self.checkResponse( resp, '404 NOT FOUND' ) ) | |
def test_delete_record( self ): | |
resp = self.app.delete( '/people/id/7' ) | |
self.assertTrue( self.checkResponse( resp, '202 ACCEPTED' ) ) | |
def test_delete_all_records( self ): | |
resp = self.app.delete( '/people' ) | |
self.assertTrue( self.checkResponse( resp, '202 ACCEPTED' ) ) | |
if __name__ == '__main__': | |
logging.basicConfig( stream=sys.stderr ) | |
logging.getLogger( ).setLevel( logging.DEBUG ) | |
unittest.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment