Skip to content

Instantly share code, notes, and snippets.

@bhathiya
Last active September 9, 2019 16:59
Show Gist options
  • Save bhathiya/60c98aa6a0cd7e300e26edf8e281e3f5 to your computer and use it in GitHub Desktop.
Save bhathiya/60c98aa6a0cd7e300e26edf8e281e3f5 to your computer and use it in GitHub Desktop.
import ballerina/log;
import ballerina/http;
import ballerina/math;
import ballerina/system;
import ballerina/io;
import ballerina/mysql;
import ballerina/sql;
import ballerina/config;
const string UUID = "UUID";
http:Client gwEndpoint = new("http://apim-server:8280/receiver");
// Create SQL client for MySQL database
mysql:Client balDB = new({
host: config:getAsString("DATABASE_HOST", defaultValue = "localhost"),
port: config:getAsInt("DATABASE_PORT", defaultValue = 3306),
name: config:getAsString("DATABASE_NAME", defaultValue = "bal_db"),
username: config:getAsString("DATABASE_USERNAME", defaultValue = "root"),
password: config:getAsString("DATABASE_PASSWORD", defaultValue = "root"),
dbOptions: { useSSL: false }
});
type Client record {
string id;
};
type Notification record {
string id;
string notif;
};
@http:ServiceConfig {
basePath: "/backend",
cors: {
allowOrigins: ["*"],
allowCredentials: false,
allowMethods: ["GET","POST","PUT"]
}
}
service NotificationAppUpgrader on new http:Listener(9090) {
@http:ResourceConfig {
methods: ["GET"],
path: "/notificationmanager/notifications/{notificationId}"
}
resource function getNotification(http:Caller caller, http:Request req, string notificationId) {
json notific = getNotificationById(notificationId);
http:Response response = new;
if (notific == null) {
log:printError("Notification : " + notificationId + " cannot be found.");
response.statusCode = 404;
response.setJsonPayload({status:"error"});
var result = caller->respond(response);
if (result is error) {
log:printError("Error sending response", err = result);
}
}
json res = {status:"ok", data: <json> notific};
// Set the JSON payload in the outgoing response message.
response.setJsonPayload(untaint res);
// Send response to the client.
var result = caller->respond(response);
if (result is error) {
log:printError("Error sending response", err = result);
}
}
@http:ResourceConfig {
methods: ["GET"],
path: "/notificationmanager/notifications"
}
resource function getAllNotifications(http:Caller caller, http:Request req) {
http:Response response = new;
json data = getAllNotifications();
var k = 0;
json payload = {status:"ok", data: data};
// Set the JSON payload in the outgoing response message.
response.setJsonPayload(untaint payload);
// Send response to the client.
var result = caller->respond(response);
if (result is error) {
log:printError("Error sending response", err = result);
}
}
@http:ResourceConfig {
methods: ["POST"],
path: "/notificationmanager/notifications"
}
resource function addNotification(http:Caller caller, http:Request req) {
http:Response response = new;
var payload = req.getJsonPayload();
if (payload is json) {
string id = system:uuid();
json jsonPayload = payload;
jsonPayload.id = id;
jsonPayload.url = "https://localhost:9090/notifications/" + id;
insertNotifications(id, jsonPayload);
json data = getAllNotifications();
var k = 0;
json broadcastMsg = {"type":"data", "data": data, "event":"/notificationmanager/notifications/", "status":"ok"};
broadcast(broadcastMsg, "/notificationmanager/notifications/");
// Create response message.
json res = { status: "ok" };
response.setJsonPayload(untaint res);
response.setHeader("Location", "/notifications/" + id);
response.statusCode = 201;
// Send response to the client.
var result = caller->respond(response);
if (result is error) {
log:printError("Error sending response", err = result);
}
} else {
response.statusCode = 400;
response.setPayload("Invalid payload received");
var result = caller->respond(response);
if (result is error) {
log:printError("Error sending response", err = result);
}
}
}
@http:ResourceConfig {
methods: ["POST"],
path: "/notificationmanager/notifications/{notificationId}"
}
resource function updateSpeedAndLockState(http:Caller caller, http:Request req, string notificationId) {
json res = {status:"ok"};
http:Response response = new;
var reqPayload = req.getJsonPayload();
json jsonPayload = {};
string locked = "locked";
int speed = 10;
if (reqPayload is json) {
jsonPayload = reqPayload;
// Find the requested order from the map and retrieve it in JSON format.
json payload = getNotificationById(notificationId);
if (payload == null) {
locked = jsonPayload.lockState==null ? "locked" : jsonPayload.lockState.toString();
speed = jsonPayload.speed==null ? 10 : <int> jsonPayload.speed;
payload = {"name":"placeholder", "id":notificationId, "title":"Here you go",
"uri":"/notificationmanager/notifications/" + notificationId, "content":"no new content given",
"lockState": locked, "speed": speed};
} else {
locked = jsonPayload.lockState==null ? payload.lockState.toString() : jsonPayload.lockState.toString();
speed = jsonPayload.speed==null ? <int> payload.speed : <int> jsonPayload.speed;
payload.lockState = locked;
payload.speed = speed;
}
removeNotification(notificationId);
insertNotifications(notificationId, payload);
json broadcastMsg = {"status": "ok", "type": "data", "event":"/notificationmanager/notifications/"
+ notificationId};
broadcastMsg.data = json.convert(payload);
broadcast(broadcastMsg, "/notificationmanager/notifications/" + notificationId);
res = {status:"ok"};
} else {
res = {status:"error"};
}
// Set the JSON payload in the outgoing response message.
response.setJsonPayload(untaint res);
// Send response to the client.
var result = caller->respond(response);
if (result is error) {
log:printError("Error sending response", err = result);
}
}
@http:ResourceConfig {
methods: ["PUT"],
path: "/notificationmanager/notifications/{notificationId}"
}
resource function updateNotification(http:Caller caller, http:Request req, string notificationId) {
var updatedNotification = req.getJsonPayload();
http:Response response = new;
if (updatedNotification is json) {
// Find the notification that needs to be updated and retrieve it in JSON format.
json existingNotification = getNotificationById(notificationId);
json res;
// Updating existing notification with the attributes of the updated notification.
if (existingNotification != null) {
updatedNotification.id = notificationId;
updatedNotification.url = "https://localhost:9090/notifications/" + notificationId;
insertNotifications(notificationId, updatedNotification);
json broadcastMsg = {"type":"data", "data": updatedNotification, "event":"/notificationmanager/notifications/", "status":"ok"};
broadcast(broadcastMsg, "/notificationmanager/notifications/" + notificationId);
res = {status:"ok"};
} else {
res = {status:"error"};
}
// Set the JSON payload to the outgoing response message to the client.
response.setJsonPayload(untaint res);
// Send response to the client.
var result = caller->respond(response);
if (result is error) {
log:printError("Error sending response", err = result);
}
} else {
response.statusCode = 400;
response.setPayload("Invalid payload received");
var result = caller->respond(response);
if (result is error) {
log:printError("Error sending response", err = result);
}
}
}
@http:ResourceConfig {
methods: ["DELETE"],
path: "/notificationmanager/notifications/{notificationId}"
}
resource function removeNotification(http:Caller caller, http:Request req, string notificationId) {
http:Response response = new;
// Remove the requested notification from the map.
removeNotification(notificationId);
json payload = {status:"ok"};
response.setJsonPayload(untaint payload);
json data = getNotificationById(notificationId);
var k = 0;
json broadcastMsg = {"type":"data", "data": data, "event":"/notificationmanager/notifications/", "status":"ok"};
broadcast(broadcastMsg, "/notificationmanager/notifications/");
// Send response to the client.
var result = caller->respond(response);
if (result is error) {
log:printError("Error sending response", err = result);
}
}
// Resource to upgrade from HTTP to WebSocket
@http:ResourceConfig {
webSocketUpgrade: {
upgradePath: "/",
upgradeService: NotificationServerApp
}
}
resource function upgrader(http:Caller caller, http:Request req) {
map<string> headers = {};
http:WebSocketCaller wsEp = caller->acceptWebSocketUpgrade(headers);
log:printInfo("A new client connected!");
}
}
service NotificationServerApp = @http:WebSocketServiceConfig service {
// This resource will trigger when a new text message arrives to the chat server
resource function onText(http:WebSocketCaller caller, string text) {
string clientId;
// Prepare the message
io:StringReader sr = new(text, encoding = "UTF-8");
json msg = {};
boolean close = false;
json|error ret = sr.readJson();
if (ret is error) {
return;
} else {
msg = ret.Content;
if (ret.Close !== null && ret.Close is boolean ) {
close = <boolean> ret.Close;
}
if (ret.ClientId == null ) {
return;
} else {
clientId = ret.ClientId.toString();
}
}
//close
if (close) {
unsubscribeFromAll(clientId);
var r = caller->close(statusCode = 1000, reason = "Client requested closure.", timeoutInSecs = 5);
return;
}
if (msg == null || msg is ()){
return;
}
//subscribe
if(msg["type"] !== null && msg["event"] !==null && msg["type"].toString() == "subscribe") {
string event = msg["event"].toString();
json res = {"status":"ok", "type":"subscribe", "event": event};
var err = caller->pushText(res.toString());
if (err is error) {
log:printError("Error sending message", err = err);
}
insertSubscriptions(event, clientId);
msg = <json> ret;
json data = [];
if (event == "/notificationmanager/notifications/"){
data = getAllNotifications();
} else {
string id = event.split("/notificationmanager/notifications/")[1];
data = getNotificationById(id);
}
json res2 = {"status":"ok", "type":"data", "event": event, "data": data};
err = caller->pushText(res2.toString());
if (err is error) {
log:printError("Error sending data", err = err);
}
caller.attributes["UUID"] = clientId;
log:printInfo("Subcsribed: " + clientId + " to: " + event);
}
//unsubscribe
if(msg["type"] !== null && msg["event"] !== null && msg["type"].toString() == "unsubscribe") {
unsubscribe(clientId, msg["event"].toString());
json res2 = {"status":"ok"};
error? err = caller->pushText(res2.toString());
if (err is error) {
log:printError("Error sending data", err = err);
}
}
}
// This resource will trigger when a existing connection closes
resource function onClose(http:WebSocketCaller caller, int statusCode, string reason) {
log:printInfo("close" + ((getAttributeStr(caller, UUID) is string)
? (": " + <string> getAttributeStr(caller, UUID)) : "-"));
unsubscribeFromAll(getAttributeStr(caller, UUID));
log:printInfo("Client disconnected" + ((getAttributeStr(caller, UUID) is string)
? (": " + <string> getAttributeStr(caller, UUID)) : "-"));
}
};
//Unsubcribe the caller from all events
function unsubscribeFromAll(string? clientId){
if(clientId is string){
removeSubscriptions(clientId);
log:printInfo("Unsubcsribed: " + clientId + " from all events.");
}
}
//Unsubcribe the caller from a given event
function unsubscribe(string clientId, string event){
removeSubscription(event, clientId);
log:printInfo("Unsubcsribed: " + clientId + " from: " + event);
}
// Send the text to all subscriptions in the subscriptions map
function broadcast(json text, string event) {
string[] subscribers = getClientsBySubscription(event);
foreach string subscriber in subscribers {
// Push the event message to the subscriber
log:printInfo("Publishing to subcsriber: " + subscriber);
http:Request req = new;
json payload = {"ClientId": subscriber, "Content": text };
req.setPayload(untaint payload);
var res = gwEndpoint->post("/", req);
if (res is error) {
log:printError("Error sending update to the gateway.", err = res);
}
}
}
// Gets attribute for given key from a WebSocket endpoint
function getAttributeStr(http:WebSocketCaller ep, string key) returns (string|()) {
return <string?>ep.attributes[key];
}
public function insertSubscriptions(string event, string id) {
string updateStatus;
string sqlString = "INSERT INTO SUBSCRIPTIONS (event, id) VALUES (?,?)";
// Insert data to SQL database by invoking update action
var ret = balDB->update(sqlString, event, id);
// Check type to verify the validity of the result from database
if (ret is sql:UpdateResult) {
updateStatus = "Data Inserted Successfully.";
} else {
updateStatus = "Error occurred while inserting subscriptions.";
// Log the error for the service maintainers.
log:printError("Error occurred in data update", err = ret);
}
}
public function removeSubscriptions(string id) {
string updateStatus;
string sqlString = "DELETE FROM SUBSCRIPTIONS WHERE id = ?";
// Insert data to SQL database by invoking update action
var ret = balDB->update(sqlString, id);
// Check type to verify the validity of the result from database
if (ret is sql:UpdateResult) {
updateStatus = "Subscription removed Successfully.";
} else {
updateStatus = "Error occurred while removing subscription.";
// Log the error for the service maintainers.
log:printError("Error occurred in subscription removal", err = ret);
}
}
public function removeSubscription(string event, string id) {
string updateStatus;
string sqlString = "DELETE FROM SUBSCRIPTIONS WHERE event = ? AND id = ?";
// Insert data to SQL database by invoking update action
var ret = balDB->update(sqlString, event, id);
// Check type to verify the validity of the result from database
if (ret is sql:UpdateResult) {
updateStatus = "Subscription removed Successfully.";
} else {
updateStatus = "Error occurred while removing subscription.";
// Log the error for the service maintainers.
log:printError("Error occurred in subscription removal", err = ret);
}
}
public function getClientsBySubscription(string event) returns (string[]) {
string[] clients = [];
string sqlString = "SELECT id FROM SUBSCRIPTIONS WHERE event = ?";
// Retrieve employee data by invoking select remote function defined in ballerina sql client
var ret = balDB->select(sqlString, Client, event);
if (ret is table<Client>) {
int i =0;
foreach Client cl in ret {
clients[i] = cl.id;
i = i +1;
}
} else {
log:printError("Error occurred in data retrieval", err = ret);
}
return clients;
}
public function insertNotifications(string id, json notif) {
string updateStatus;
string sqlString = "INSERT INTO NOTIFICATIONS (id, notifs) VALUES (?, ?)";
// Insert data to SQL database by invoking update action
var ret = balDB->update(sqlString, id, notif.toString());
// Check type to verify the validity of the result from database
if (ret is sql:UpdateResult) {
updateStatus = "Data Inserted Successfully.";
} else {
updateStatus = "Error occurred while inserting subscriptions.";
// Log the error for the service maintainers.
log:printError("Error occurred in data update", err = ret);
}
}
public function removeNotification(string id) {
string updateStatus;
string sqlString = "DELETE FROM NOTIFICATIONS where id = ?";
// Insert data to SQL database by invoking update action
var ret = balDB->update(sqlString, id);
// Check type to verify the validity of the result from database
if (ret is sql:UpdateResult) {
updateStatus = "Notification removed Successfully.";
} else {
updateStatus = "Error occurred while removing notification.";
// Log the error for the service maintainers.
log:printError("Error occurred in notification removal", err = ret);
}
}
public function getNotificationById(string id) returns (json) {
string sqlString = "SELECT * FROM NOTIFICATIONS WHERE id = ?";
// Retrieve employee data by invoking select remote function defined in ballerina sql client
var ret = balDB->select(sqlString, Notification, id);
if (ret is table<Notification>) {
foreach Notification nt in ret {
io:StringReader sr = new(nt.notif, encoding = "UTF-8");
var jsonNotif = sr.readJson();
if (jsonNotif is json) {
return jsonNotif;
}
break;
}
} else {
log:printError("Error occurred in data retrieval", err = ret);
}
return null;
}
public function getAllNotifications() returns (json) {
string sqlString = "SELECT * FROM NOTIFICATIONS";
json jsonNotifs = [];
// Retrieve employee data by invoking select remote function defined in ballerina sql client
var ret = balDB->select(sqlString, Notification);
int i = 0;
if (ret is table<Notification>) {
foreach Notification nt in ret {
io:StringReader sr = new(nt.notif, encoding = "UTF-8");
var jsonNotif = sr.readJson();
if (jsonNotif is json) {
jsonNotifs[i] = jsonNotif;
i = i + 1;
}
}
} else {
log:printError("Error occurred in data retrieval", err = ret);
}
return jsonNotifs;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment