Created
May 22, 2019 09:11
-
-
Save hughjfchen/d661b5670987ed8ef8fa5e6c03498ea4 to your computer and use it in GitHub Desktop.
code to mix mqtt and api within a transient program
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# LANGUAGE TypeOperators #-} | |
module Main | |
(main | |
) where | |
import Protolude hiding (async, local) | |
import Data.String | |
import Data.Aeson | |
import Transient.Internals | |
import Transient.Move | |
import Transient.Move.Utils | |
import Transient.Move.Internals | |
import Transient.Move.Services | |
import Transient.Indeterminism | |
import Control.Applicative | |
import Transient.Logged | |
import Control.Concurrent(threadDelay) | |
import Control.Monad.IO.Class | |
import qualified Data.ByteString.Lazy.Char8 as BS | |
import qualified Data.ByteString as BSS | |
import qualified Data.Text as DT | |
import Network.MQTT.Client | |
deleteRESTReq :: String -> Int -> String | |
deleteRESTReq obj objId = "DELETE /" ++ obj ++ "?id=eq." ++ (show objId) ++" HTTP/1.1\r\n" | |
<> "Host: $hostnode\r\n" | |
<> "User-Agent: transient/0.5.2\r\n" | |
<> "Accept: */*\r\n" | |
<> "Connection: close\r\n" | |
<> "\r\n" | |
getRESTReq :: String -> Int -> String | |
getRESTReq obj objId= "GET /" ++ obj ++ "?id=eq." ++ (show objId) ++" HTTP/1.1\r\n" | |
<> "Host: $hostnode\r\n" | |
<> "User-Agent: transient/0.5.2\r\n" | |
<> "Accept: */*\r\n" | |
<> "Connection: close\r\n" | |
<> "\r\n" | |
postRESTReq :: String -> Int -> String -> String | |
postRESTReq obj payloadLength payload= "POST /" ++ obj ++ " HTTP/1.1\r\n" | |
<> "Host: $hostnode\r\n" | |
<> "User-Agent: transient/0.5.2\r\n" | |
<> "Accept: */*\r\n" | |
<> "Connection: close\r\n" | |
<> "Content-Type: application/json\r\n" | |
<> "Content-Length: " ++ (show payloadLength) ++ "\r\n\r\n" | |
<> payload | |
patchRESTReq :: String -> Int -> Int -> String -> String | |
patchRESTReq obj objId payloadLength payload = "PATCH /" ++ obj ++ "?id=eq." ++ (show objId) ++ " HTTP/1.1\r\n" | |
<> "Host: $hostnode\r\n" | |
<> "User-Agent: transient/0.5.2\r\n" | |
<> "Accept: */*\r\n" | |
<> "Connection: close\r\n" | |
<> "Content-Type: application/json\r\n" | |
<> "Content-Length: " ++ (show payloadLength) ++ "\r\n\r\n" | |
<> payload | |
postRestService :: String -> String -> String -> Int -> String -> [(Package, Program)] | |
postRestService host port obj payloadLength payload = let pReq = postRESTReq obj payloadLength payload | |
in | |
[("type","HTTP") | |
,("nodehost",host) | |
,("nodeport",port),("HTTPstr",pReq)] | |
patchRestService :: String -> String -> String -> Int -> Int -> String -> [(String, String)] | |
patchRestService host port obj objId payloadLength payload = let pReq = patchRESTReq obj objId payloadLength payload | |
in | |
[("type","HTTP") | |
,("nodehost",host) | |
,("nodeport",port),("HTTPstr",pReq)] | |
getRestService :: String -> String -> String -> Int -> [(String, String)] | |
getRestService host port obj objId = let gReq = getRESTReq obj objId | |
in | |
[("type","HTTP") | |
,("nodehost",host) | |
,("nodeport",port),("HTTPstr",gReq)] | |
deleteRestService :: String -> String -> String -> Int -> [(String, String)] | |
deleteRestService host port obj objId = let dReq = deleteRESTReq obj objId | |
in | |
[("type","HTTP") | |
,("nodehost",host) | |
,("nodeport",port),("HTTPstr",dReq)] | |
data PostResponse= OK | ErrorPost Value deriving (Typeable, Read,Show) | |
instance Loggable1 PostResponse where | |
serialize _ = undefined | |
deserialize = (ErrorPost <$> deserialize) <|> return OK | |
postParaToJson :: PostParams -> String | |
postParaToJson paras = | |
"{" <> | |
(intercalate "," $ | |
fmap | |
(\pair -> | |
"\"" ++ | |
(BS.unpack $ fst pair) ++ "\"" ++ ":" ++ "\"" ++ snd pair ++ "\"") | |
paras) <> | |
"}" | |
locateServicePrefix :: String -> Text | |
locateServicePrefix site = ("ptm/site/" :: Text) | |
<> (DT.pack site) | |
main = keep $ initNode $ serviceMonitor <|> dataSync <|> ptmApi | |
serviceMonitor = local . async $ do | |
mc <- runClient mqttConfig{_hostname="47.112.196.170", _port=1883, _connID="ptm-service-monitor", | |
-- _cleanSession=False, | |
_lwt=Just $ mkLWT "ptm/status/ptm-service-monitor" "bye for now" False | |
, _msgCB = Just showStatus} | |
print =<< subscribe mc [("ptm/status/+", QoS0)] | |
print =<< waitForClient mc | |
disconnect mc | |
where | |
showStatus _ t m = print (t, m) | |
dataSync = local . async $ do | |
mc <- runClient mqttConfig{_hostname="47.112.196.170", _port=1883, _connID="ptm-data-sync", | |
-- _cleanSession=False, | |
_lwt=Just $ mkLWT "ptm/status/ptm-data-sync" "bye for now" False | |
, _msgCB = Just syncData} | |
print =<< subscribe mc [("ptm/site/+/datasync/#", QoS2)] | |
print =<< waitForClient mc | |
disconnect mc | |
where | |
syncData _ t m = print (t, m) | |
ptmApi = api $ gets <|> posts <|> badRequest | |
where | |
posts= do | |
received POST | |
received ("site" :: String) | |
site' <- param | |
let servicePre = locateServicePrefix site' | |
station servicePre <|> team servicePre <|> user servicePre <|> storeRoom servicePre <|> workType servicePre <|> task servicePre <|> toolClass servicePre <|> toolDefect servicePre <|> tool servicePre <|> operateTool servicePre | |
station servicePre= do | |
received ("station" :: String) | |
postParams <- param :: TransIO PostParams | |
let postJsonReq = postParaToJson postParams | |
do | |
pRes <- runCloud $ (callService (postRestService "47.112.196.170" "9001" "station" (length postJsonReq) postJsonReq) ()) | |
case pRes of | |
OK -> return $ BS.pack $ "HTTP/1.0 200 OK\nContent-Type: text/plain\nContent-Length: 0" | |
++ "\nConnection: close\n\n" | |
ErrorPost _ -> return $ BS.pack $ "HTTP/1.0 200 OK\nContent-Type: text/plain\nContent-Length: "++ show (length ("asdfsadf" :: String)) | |
++ "\nConnection: close\n\n" ++ ("asdfasdf" :: String) | |
liftIO $ do | |
mc <- runClient mqttConfig{_hostname="47.112.196.170", _port=1883, _connID="hasqttl", | |
-- _cleanSession=False, | |
_lwt=Just $ mkLWT "tmp/haskquit" "bye for now" False} | |
publishq mc (servicePre <> "/station/create" :: Text) (BS.pack postJsonReq) False QoS2 | |
disconnect mc | |
return $ BS.pack $ "HTTP/1.0 200 OK\nContent-Type: text/plain\nContent-Length: 0" | |
++ "\nConnection: close\n\n" | |
team servicePre = do | |
received ("team" :: String) | |
received ("id" :: String) | |
sId <- param | |
postParams <- param | |
liftIO $ print (sId :: Int) | |
liftIO $ print (postParams :: PostParams) | |
let msg= "received\n" | |
return $ BS.pack $ "HTTP/1.0 200 OK\nContent-Type: text/plain\nContent-Length: "++ show (length msg) | |
++ "\nConnection: close\n\n" ++ msg | |
user servicePre = do | |
received ("user" :: String) | |
received ("id" :: String) | |
sId <- param | |
postParams <- param | |
liftIO $ print (sId :: Int) | |
liftIO $ print (postParams :: PostParams) | |
let msg= "received\n" | |
return $ BS.pack $ "HTTP/1.0 200 OK\nContent-Type: text/plain\nContent-Length: "++ show (length msg) | |
++ "\nConnection: close\n\n" ++ msg | |
storeRoom servicePre = do | |
received ("storeroom" :: String) | |
received ("id" :: String) | |
sId <- param | |
postParams <- param | |
liftIO $ print (sId :: Int) | |
liftIO $ print (postParams :: PostParams) | |
let msg= "received\n" | |
return $ BS.pack $ "HTTP/1.0 200 OK\nContent-Type: text/plain\nContent-Length: "++ show (length msg) | |
++ "\nConnection: close\n\n" ++ msg | |
workType servicePre = do | |
received ("worktype" :: String) | |
received ("id" :: String) | |
sId <- param | |
postParams <- param | |
liftIO $ print (sId :: Int) | |
liftIO $ print (postParams :: PostParams) | |
let msg= "received\n" | |
return $ BS.pack $ "HTTP/1.0 200 OK\nContent-Type: text/plain\nContent-Length: "++ show (length msg) | |
++ "\nConnection: close\n\n" ++ msg | |
task servicePre = do | |
received ("task" :: String) | |
received ("id" :: String) | |
sId <- param | |
postParams <- param | |
liftIO $ print (sId :: Int) | |
liftIO $ print (postParams :: PostParams) | |
let msg= "received\n" | |
return $ BS.pack $ "HTTP/1.0 200 OK\nContent-Type: text/plain\nContent-Length: "++ show (length msg) | |
++ "\nConnection: close\n\n" ++ msg | |
toolClass servicePre = do | |
received ("toolclass" :: String) | |
received ("id" :: String) | |
sId <- param | |
postParams <- param | |
liftIO $ print (sId :: Int) | |
liftIO $ print (postParams :: PostParams) | |
let msg= "received\n" | |
return $ BS.pack $ "HTTP/1.0 200 OK\nContent-Type: text/plain\nContent-Length: "++ show (length msg) | |
++ "\nConnection: close\n\n" ++ msg | |
toolDefect servicePre = do | |
received ("tooldefect" :: String) | |
received ("id" :: String) | |
sId <- param | |
postParams <- param | |
liftIO $ print (sId :: Int) | |
liftIO $ print (postParams :: PostParams) | |
let msg= "received\n" | |
return $ BS.pack $ "HTTP/1.0 200 OK\nContent-Type: text/plain\nContent-Length: "++ show (length msg) | |
++ "\nConnection: close\n\n" ++ msg | |
tool servicePre = do | |
received ("tool" :: String) | |
received ("id" :: String) | |
sId <- param | |
postParams <- param | |
liftIO $ print (sId :: Int) | |
liftIO $ print (postParams :: PostParams) | |
let msg= "received\n" | |
return $ BS.pack $ "HTTP/1.0 200 OK\nContent-Type: text/plain\nContent-Length: "++ show (length msg) | |
++ "\nConnection: close\n\n" ++ msg | |
operateTool servicePre = do | |
received ("operatetool" :: String) | |
received ("id" :: String) | |
sId <- param | |
postParams <- param | |
liftIO $ print (sId :: Int) | |
liftIO $ print (postParams :: PostParams) | |
let msg= "received\n" | |
return $ BS.pack $ "HTTP/1.0 200 OK\nContent-Type: text/plain\nContent-Length: "++ show (length msg) | |
++ "\nConnection: close\n\n" ++ msg | |
gets= do | |
received GET | |
hello <|> hellostream | |
hello= do | |
received ("hello" :: String) | |
name <- param | |
let msg= "hello " ++ name ++ "\n" | |
len= length msg | |
return $ BS.pack $ "HTTP/1.0 200 OK\nContent-Type: text/plain\nContent-Length: "++ show len | |
++ "\nConnection: close\n\n" ++ msg | |
hellostream = do | |
received ("hellos" :: String) | |
name <- param | |
header <|> stream name | |
where | |
header=async $ return $ BS.pack $ | |
"HTTP/1.0 200 OK\nContent-Type: text/plain\nConnection: close\n\n"++ | |
"here follows a stream\n" | |
stream name= do | |
i <- threads 0 $ choose [1 ..] | |
liftIO $ threadDelay 100000 | |
return . BS.pack $ " hello " ++ name ++ " "++ show i | |
badRequest = return $ BS.pack $ | |
let resp="Bad Request\n\ | |
\Usage: GET: host/port/api/hello/<name>, host/port/api/hellos/<name>\n\ | |
\ POST: host/port/api\n" | |
in "HTTP/1.0 400 Bad Request\nContent-Length: " ++ show(length resp) | |
++"\nConnection: close\n\n"++ resp | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment