Last active
September 9, 2019 16:59
-
-
Save bhathiya/60c98aa6a0cd7e300e26edf8e281e3f5 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ballerina/log; | |
import ballerina/http; | |
import ballerina/math; | |
import ballerina/system; | |
import ballerina/io; | |
import ballerina/mysql; | |
import ballerina/sql; | |
import ballerina/config; | |
const string UUID = "UUID"; | |
http:Client gwEndpoint = new("http://apim-server:8280/receiver"); | |
// Create SQL client for MySQL database | |
mysql:Client balDB = new({ | |
host: config:getAsString("DATABASE_HOST", defaultValue = "localhost"), | |
port: config:getAsInt("DATABASE_PORT", defaultValue = 3306), | |
name: config:getAsString("DATABASE_NAME", defaultValue = "bal_db"), | |
username: config:getAsString("DATABASE_USERNAME", defaultValue = "root"), | |
password: config:getAsString("DATABASE_PASSWORD", defaultValue = "root"), | |
dbOptions: { useSSL: false } | |
}); | |
type Client record { | |
string id; | |
}; | |
type Notification record { | |
string id; | |
string notif; | |
}; | |
@http:ServiceConfig { | |
basePath: "/backend", | |
cors: { | |
allowOrigins: ["*"], | |
allowCredentials: false, | |
allowMethods: ["GET","POST","PUT"] | |
} | |
} | |
service NotificationAppUpgrader on new http:Listener(9090) { | |
@http:ResourceConfig { | |
methods: ["GET"], | |
path: "/notificationmanager/notifications/{notificationId}" | |
} | |
resource function getNotification(http:Caller caller, http:Request req, string notificationId) { | |
json notific = getNotificationById(notificationId); | |
http:Response response = new; | |
if (notific == null) { | |
log:printError("Notification : " + notificationId + " cannot be found."); | |
response.statusCode = 404; | |
response.setJsonPayload({status:"error"}); | |
var result = caller->respond(response); | |
if (result is error) { | |
log:printError("Error sending response", err = result); | |
} | |
} | |
json res = {status:"ok", data: <json> notific}; | |
// Set the JSON payload in the outgoing response message. | |
response.setJsonPayload(untaint res); | |
// Send response to the client. | |
var result = caller->respond(response); | |
if (result is error) { | |
log:printError("Error sending response", err = result); | |
} | |
} | |
@http:ResourceConfig { | |
methods: ["GET"], | |
path: "/notificationmanager/notifications" | |
} | |
resource function getAllNotifications(http:Caller caller, http:Request req) { | |
http:Response response = new; | |
json data = getAllNotifications(); | |
var k = 0; | |
json payload = {status:"ok", data: data}; | |
// Set the JSON payload in the outgoing response message. | |
response.setJsonPayload(untaint payload); | |
// Send response to the client. | |
var result = caller->respond(response); | |
if (result is error) { | |
log:printError("Error sending response", err = result); | |
} | |
} | |
@http:ResourceConfig { | |
methods: ["POST"], | |
path: "/notificationmanager/notifications" | |
} | |
resource function addNotification(http:Caller caller, http:Request req) { | |
http:Response response = new; | |
var payload = req.getJsonPayload(); | |
if (payload is json) { | |
string id = system:uuid(); | |
json jsonPayload = payload; | |
jsonPayload.id = id; | |
jsonPayload.url = "https://localhost:9090/notifications/" + id; | |
insertNotifications(id, jsonPayload); | |
json data = getAllNotifications(); | |
var k = 0; | |
json broadcastMsg = {"type":"data", "data": data, "event":"/notificationmanager/notifications/", "status":"ok"}; | |
broadcast(broadcastMsg, "/notificationmanager/notifications/"); | |
// Create response message. | |
json res = { status: "ok" }; | |
response.setJsonPayload(untaint res); | |
response.setHeader("Location", "/notifications/" + id); | |
response.statusCode = 201; | |
// Send response to the client. | |
var result = caller->respond(response); | |
if (result is error) { | |
log:printError("Error sending response", err = result); | |
} | |
} else { | |
response.statusCode = 400; | |
response.setPayload("Invalid payload received"); | |
var result = caller->respond(response); | |
if (result is error) { | |
log:printError("Error sending response", err = result); | |
} | |
} | |
} | |
@http:ResourceConfig { | |
methods: ["POST"], | |
path: "/notificationmanager/notifications/{notificationId}" | |
} | |
resource function updateSpeedAndLockState(http:Caller caller, http:Request req, string notificationId) { | |
json res = {status:"ok"}; | |
http:Response response = new; | |
var reqPayload = req.getJsonPayload(); | |
json jsonPayload = {}; | |
string locked = "locked"; | |
int speed = 10; | |
if (reqPayload is json) { | |
jsonPayload = reqPayload; | |
// Find the requested order from the map and retrieve it in JSON format. | |
json payload = getNotificationById(notificationId); | |
if (payload == null) { | |
locked = jsonPayload.lockState==null ? "locked" : jsonPayload.lockState.toString(); | |
speed = jsonPayload.speed==null ? 10 : <int> jsonPayload.speed; | |
payload = {"name":"placeholder", "id":notificationId, "title":"Here you go", | |
"uri":"/notificationmanager/notifications/" + notificationId, "content":"no new content given", | |
"lockState": locked, "speed": speed}; | |
} else { | |
locked = jsonPayload.lockState==null ? payload.lockState.toString() : jsonPayload.lockState.toString(); | |
speed = jsonPayload.speed==null ? <int> payload.speed : <int> jsonPayload.speed; | |
payload.lockState = locked; | |
payload.speed = speed; | |
} | |
removeNotification(notificationId); | |
insertNotifications(notificationId, payload); | |
json broadcastMsg = {"status": "ok", "type": "data", "event":"/notificationmanager/notifications/" | |
+ notificationId}; | |
broadcastMsg.data = json.convert(payload); | |
broadcast(broadcastMsg, "/notificationmanager/notifications/" + notificationId); | |
res = {status:"ok"}; | |
} else { | |
res = {status:"error"}; | |
} | |
// Set the JSON payload in the outgoing response message. | |
response.setJsonPayload(untaint res); | |
// Send response to the client. | |
var result = caller->respond(response); | |
if (result is error) { | |
log:printError("Error sending response", err = result); | |
} | |
} | |
@http:ResourceConfig { | |
methods: ["PUT"], | |
path: "/notificationmanager/notifications/{notificationId}" | |
} | |
resource function updateNotification(http:Caller caller, http:Request req, string notificationId) { | |
var updatedNotification = req.getJsonPayload(); | |
http:Response response = new; | |
if (updatedNotification is json) { | |
// Find the notification that needs to be updated and retrieve it in JSON format. | |
json existingNotification = getNotificationById(notificationId); | |
json res; | |
// Updating existing notification with the attributes of the updated notification. | |
if (existingNotification != null) { | |
updatedNotification.id = notificationId; | |
updatedNotification.url = "https://localhost:9090/notifications/" + notificationId; | |
insertNotifications(notificationId, updatedNotification); | |
json broadcastMsg = {"type":"data", "data": updatedNotification, "event":"/notificationmanager/notifications/", "status":"ok"}; | |
broadcast(broadcastMsg, "/notificationmanager/notifications/" + notificationId); | |
res = {status:"ok"}; | |
} else { | |
res = {status:"error"}; | |
} | |
// Set the JSON payload to the outgoing response message to the client. | |
response.setJsonPayload(untaint res); | |
// Send response to the client. | |
var result = caller->respond(response); | |
if (result is error) { | |
log:printError("Error sending response", err = result); | |
} | |
} else { | |
response.statusCode = 400; | |
response.setPayload("Invalid payload received"); | |
var result = caller->respond(response); | |
if (result is error) { | |
log:printError("Error sending response", err = result); | |
} | |
} | |
} | |
@http:ResourceConfig { | |
methods: ["DELETE"], | |
path: "/notificationmanager/notifications/{notificationId}" | |
} | |
resource function removeNotification(http:Caller caller, http:Request req, string notificationId) { | |
http:Response response = new; | |
// Remove the requested notification from the map. | |
removeNotification(notificationId); | |
json payload = {status:"ok"}; | |
response.setJsonPayload(untaint payload); | |
json data = getNotificationById(notificationId); | |
var k = 0; | |
json broadcastMsg = {"type":"data", "data": data, "event":"/notificationmanager/notifications/", "status":"ok"}; | |
broadcast(broadcastMsg, "/notificationmanager/notifications/"); | |
// Send response to the client. | |
var result = caller->respond(response); | |
if (result is error) { | |
log:printError("Error sending response", err = result); | |
} | |
} | |
// Resource to upgrade from HTTP to WebSocket | |
@http:ResourceConfig { | |
webSocketUpgrade: { | |
upgradePath: "/", | |
upgradeService: NotificationServerApp | |
} | |
} | |
resource function upgrader(http:Caller caller, http:Request req) { | |
map<string> headers = {}; | |
http:WebSocketCaller wsEp = caller->acceptWebSocketUpgrade(headers); | |
log:printInfo("A new client connected!"); | |
} | |
} | |
service NotificationServerApp = @http:WebSocketServiceConfig service { | |
// This resource will trigger when a new text message arrives to the chat server | |
resource function onText(http:WebSocketCaller caller, string text) { | |
string clientId; | |
// Prepare the message | |
io:StringReader sr = new(text, encoding = "UTF-8"); | |
json msg = {}; | |
boolean close = false; | |
json|error ret = sr.readJson(); | |
if (ret is error) { | |
return; | |
} else { | |
msg = ret.Content; | |
if (ret.Close !== null && ret.Close is boolean ) { | |
close = <boolean> ret.Close; | |
} | |
if (ret.ClientId == null ) { | |
return; | |
} else { | |
clientId = ret.ClientId.toString(); | |
} | |
} | |
//close | |
if (close) { | |
unsubscribeFromAll(clientId); | |
var r = caller->close(statusCode = 1000, reason = "Client requested closure.", timeoutInSecs = 5); | |
return; | |
} | |
if (msg == null || msg is ()){ | |
return; | |
} | |
//subscribe | |
if(msg["type"] !== null && msg["event"] !==null && msg["type"].toString() == "subscribe") { | |
string event = msg["event"].toString(); | |
json res = {"status":"ok", "type":"subscribe", "event": event}; | |
var err = caller->pushText(res.toString()); | |
if (err is error) { | |
log:printError("Error sending message", err = err); | |
} | |
insertSubscriptions(event, clientId); | |
msg = <json> ret; | |
json data = []; | |
if (event == "/notificationmanager/notifications/"){ | |
data = getAllNotifications(); | |
} else { | |
string id = event.split("/notificationmanager/notifications/")[1]; | |
data = getNotificationById(id); | |
} | |
json res2 = {"status":"ok", "type":"data", "event": event, "data": data}; | |
err = caller->pushText(res2.toString()); | |
if (err is error) { | |
log:printError("Error sending data", err = err); | |
} | |
caller.attributes["UUID"] = clientId; | |
log:printInfo("Subcsribed: " + clientId + " to: " + event); | |
} | |
//unsubscribe | |
if(msg["type"] !== null && msg["event"] !== null && msg["type"].toString() == "unsubscribe") { | |
unsubscribe(clientId, msg["event"].toString()); | |
json res2 = {"status":"ok"}; | |
error? err = caller->pushText(res2.toString()); | |
if (err is error) { | |
log:printError("Error sending data", err = err); | |
} | |
} | |
} | |
// This resource will trigger when a existing connection closes | |
resource function onClose(http:WebSocketCaller caller, int statusCode, string reason) { | |
log:printInfo("close" + ((getAttributeStr(caller, UUID) is string) | |
? (": " + <string> getAttributeStr(caller, UUID)) : "-")); | |
unsubscribeFromAll(getAttributeStr(caller, UUID)); | |
log:printInfo("Client disconnected" + ((getAttributeStr(caller, UUID) is string) | |
? (": " + <string> getAttributeStr(caller, UUID)) : "-")); | |
} | |
}; | |
//Unsubcribe the caller from all events | |
function unsubscribeFromAll(string? clientId){ | |
if(clientId is string){ | |
removeSubscriptions(clientId); | |
log:printInfo("Unsubcsribed: " + clientId + " from all events."); | |
} | |
} | |
//Unsubcribe the caller from a given event | |
function unsubscribe(string clientId, string event){ | |
removeSubscription(event, clientId); | |
log:printInfo("Unsubcsribed: " + clientId + " from: " + event); | |
} | |
// Send the text to all subscriptions in the subscriptions map | |
function broadcast(json text, string event) { | |
string[] subscribers = getClientsBySubscription(event); | |
foreach string subscriber in subscribers { | |
// Push the event message to the subscriber | |
log:printInfo("Publishing to subcsriber: " + subscriber); | |
http:Request req = new; | |
json payload = {"ClientId": subscriber, "Content": text }; | |
req.setPayload(untaint payload); | |
var res = gwEndpoint->post("/", req); | |
if (res is error) { | |
log:printError("Error sending update to the gateway.", err = res); | |
} | |
} | |
} | |
// Gets attribute for given key from a WebSocket endpoint | |
function getAttributeStr(http:WebSocketCaller ep, string key) returns (string|()) { | |
return <string?>ep.attributes[key]; | |
} | |
public function insertSubscriptions(string event, string id) { | |
string updateStatus; | |
string sqlString = "INSERT INTO SUBSCRIPTIONS (event, id) VALUES (?,?)"; | |
// Insert data to SQL database by invoking update action | |
var ret = balDB->update(sqlString, event, id); | |
// Check type to verify the validity of the result from database | |
if (ret is sql:UpdateResult) { | |
updateStatus = "Data Inserted Successfully."; | |
} else { | |
updateStatus = "Error occurred while inserting subscriptions."; | |
// Log the error for the service maintainers. | |
log:printError("Error occurred in data update", err = ret); | |
} | |
} | |
public function removeSubscriptions(string id) { | |
string updateStatus; | |
string sqlString = "DELETE FROM SUBSCRIPTIONS WHERE id = ?"; | |
// Insert data to SQL database by invoking update action | |
var ret = balDB->update(sqlString, id); | |
// Check type to verify the validity of the result from database | |
if (ret is sql:UpdateResult) { | |
updateStatus = "Subscription removed Successfully."; | |
} else { | |
updateStatus = "Error occurred while removing subscription."; | |
// Log the error for the service maintainers. | |
log:printError("Error occurred in subscription removal", err = ret); | |
} | |
} | |
public function removeSubscription(string event, string id) { | |
string updateStatus; | |
string sqlString = "DELETE FROM SUBSCRIPTIONS WHERE event = ? AND id = ?"; | |
// Insert data to SQL database by invoking update action | |
var ret = balDB->update(sqlString, event, id); | |
// Check type to verify the validity of the result from database | |
if (ret is sql:UpdateResult) { | |
updateStatus = "Subscription removed Successfully."; | |
} else { | |
updateStatus = "Error occurred while removing subscription."; | |
// Log the error for the service maintainers. | |
log:printError("Error occurred in subscription removal", err = ret); | |
} | |
} | |
public function getClientsBySubscription(string event) returns (string[]) { | |
string[] clients = []; | |
string sqlString = "SELECT id FROM SUBSCRIPTIONS WHERE event = ?"; | |
// Retrieve employee data by invoking select remote function defined in ballerina sql client | |
var ret = balDB->select(sqlString, Client, event); | |
if (ret is table<Client>) { | |
int i =0; | |
foreach Client cl in ret { | |
clients[i] = cl.id; | |
i = i +1; | |
} | |
} else { | |
log:printError("Error occurred in data retrieval", err = ret); | |
} | |
return clients; | |
} | |
public function insertNotifications(string id, json notif) { | |
string updateStatus; | |
string sqlString = "INSERT INTO NOTIFICATIONS (id, notifs) VALUES (?, ?)"; | |
// Insert data to SQL database by invoking update action | |
var ret = balDB->update(sqlString, id, notif.toString()); | |
// Check type to verify the validity of the result from database | |
if (ret is sql:UpdateResult) { | |
updateStatus = "Data Inserted Successfully."; | |
} else { | |
updateStatus = "Error occurred while inserting subscriptions."; | |
// Log the error for the service maintainers. | |
log:printError("Error occurred in data update", err = ret); | |
} | |
} | |
public function removeNotification(string id) { | |
string updateStatus; | |
string sqlString = "DELETE FROM NOTIFICATIONS where id = ?"; | |
// Insert data to SQL database by invoking update action | |
var ret = balDB->update(sqlString, id); | |
// Check type to verify the validity of the result from database | |
if (ret is sql:UpdateResult) { | |
updateStatus = "Notification removed Successfully."; | |
} else { | |
updateStatus = "Error occurred while removing notification."; | |
// Log the error for the service maintainers. | |
log:printError("Error occurred in notification removal", err = ret); | |
} | |
} | |
public function getNotificationById(string id) returns (json) { | |
string sqlString = "SELECT * FROM NOTIFICATIONS WHERE id = ?"; | |
// Retrieve employee data by invoking select remote function defined in ballerina sql client | |
var ret = balDB->select(sqlString, Notification, id); | |
if (ret is table<Notification>) { | |
foreach Notification nt in ret { | |
io:StringReader sr = new(nt.notif, encoding = "UTF-8"); | |
var jsonNotif = sr.readJson(); | |
if (jsonNotif is json) { | |
return jsonNotif; | |
} | |
break; | |
} | |
} else { | |
log:printError("Error occurred in data retrieval", err = ret); | |
} | |
return null; | |
} | |
public function getAllNotifications() returns (json) { | |
string sqlString = "SELECT * FROM NOTIFICATIONS"; | |
json jsonNotifs = []; | |
// Retrieve employee data by invoking select remote function defined in ballerina sql client | |
var ret = balDB->select(sqlString, Notification); | |
int i = 0; | |
if (ret is table<Notification>) { | |
foreach Notification nt in ret { | |
io:StringReader sr = new(nt.notif, encoding = "UTF-8"); | |
var jsonNotif = sr.readJson(); | |
if (jsonNotif is json) { | |
jsonNotifs[i] = jsonNotif; | |
i = i + 1; | |
} | |
} | |
} else { | |
log:printError("Error occurred in data retrieval", err = ret); | |
} | |
return jsonNotifs; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment