Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save hughjfchen/d661b5670987ed8ef8fa5e6c03498ea4 to your computer and use it in GitHub Desktop.
Save hughjfchen/d661b5670987ed8ef8fa5e6c03498ea4 to your computer and use it in GitHub Desktop.
code to mix mqtt and api within a transient program
{-# LANGUAGE TypeOperators #-}
module Main
(main
) where
import Protolude hiding (async, local)
import Data.String
import Data.Aeson
import Transient.Internals
import Transient.Move
import Transient.Move.Utils
import Transient.Move.Internals
import Transient.Move.Services
import Transient.Indeterminism
import Control.Applicative
import Transient.Logged
import Control.Concurrent(threadDelay)
import Control.Monad.IO.Class
import qualified Data.ByteString.Lazy.Char8 as BS
import qualified Data.ByteString as BSS
import qualified Data.Text as DT
import Network.MQTT.Client
deleteRESTReq :: String -> Int -> String
deleteRESTReq obj objId = "DELETE /" ++ obj ++ "?id=eq." ++ (show objId) ++" HTTP/1.1\r\n"
<> "Host: $hostnode\r\n"
<> "User-Agent: transient/0.5.2\r\n"
<> "Accept: */*\r\n"
<> "Connection: close\r\n"
<> "\r\n"
getRESTReq :: String -> Int -> String
getRESTReq obj objId= "GET /" ++ obj ++ "?id=eq." ++ (show objId) ++" HTTP/1.1\r\n"
<> "Host: $hostnode\r\n"
<> "User-Agent: transient/0.5.2\r\n"
<> "Accept: */*\r\n"
<> "Connection: close\r\n"
<> "\r\n"
postRESTReq :: String -> Int -> String -> String
postRESTReq obj payloadLength payload= "POST /" ++ obj ++ " HTTP/1.1\r\n"
<> "Host: $hostnode\r\n"
<> "User-Agent: transient/0.5.2\r\n"
<> "Accept: */*\r\n"
<> "Connection: close\r\n"
<> "Content-Type: application/json\r\n"
<> "Content-Length: " ++ (show payloadLength) ++ "\r\n\r\n"
<> payload
patchRESTReq :: String -> Int -> Int -> String -> String
patchRESTReq obj objId payloadLength payload = "PATCH /" ++ obj ++ "?id=eq." ++ (show objId) ++ " HTTP/1.1\r\n"
<> "Host: $hostnode\r\n"
<> "User-Agent: transient/0.5.2\r\n"
<> "Accept: */*\r\n"
<> "Connection: close\r\n"
<> "Content-Type: application/json\r\n"
<> "Content-Length: " ++ (show payloadLength) ++ "\r\n\r\n"
<> payload
postRestService :: String -> String -> String -> Int -> String -> [(Package, Program)]
postRestService host port obj payloadLength payload = let pReq = postRESTReq obj payloadLength payload
in
[("type","HTTP")
,("nodehost",host)
,("nodeport",port),("HTTPstr",pReq)]
patchRestService :: String -> String -> String -> Int -> Int -> String -> [(String, String)]
patchRestService host port obj objId payloadLength payload = let pReq = patchRESTReq obj objId payloadLength payload
in
[("type","HTTP")
,("nodehost",host)
,("nodeport",port),("HTTPstr",pReq)]
getRestService :: String -> String -> String -> Int -> [(String, String)]
getRestService host port obj objId = let gReq = getRESTReq obj objId
in
[("type","HTTP")
,("nodehost",host)
,("nodeport",port),("HTTPstr",gReq)]
deleteRestService :: String -> String -> String -> Int -> [(String, String)]
deleteRestService host port obj objId = let dReq = deleteRESTReq obj objId
in
[("type","HTTP")
,("nodehost",host)
,("nodeport",port),("HTTPstr",dReq)]
data PostResponse= OK | ErrorPost Value deriving (Typeable, Read,Show)
instance Loggable1 PostResponse where
serialize _ = undefined
deserialize = (ErrorPost <$> deserialize) <|> return OK
postParaToJson :: PostParams -> String
postParaToJson paras =
"{" <>
(intercalate "," $
fmap
(\pair ->
"\"" ++
(BS.unpack $ fst pair) ++ "\"" ++ ":" ++ "\"" ++ snd pair ++ "\"")
paras) <>
"}"
locateServicePrefix :: String -> Text
locateServicePrefix site = ("ptm/site/" :: Text)
<> (DT.pack site)
main = keep $ initNode $ serviceMonitor <|> dataSync <|> ptmApi
serviceMonitor = local . async $ do
mc <- runClient mqttConfig{_hostname="47.112.196.170", _port=1883, _connID="ptm-service-monitor",
-- _cleanSession=False,
_lwt=Just $ mkLWT "ptm/status/ptm-service-monitor" "bye for now" False
, _msgCB = Just showStatus}
print =<< subscribe mc [("ptm/status/+", QoS0)]
print =<< waitForClient mc
disconnect mc
where
showStatus _ t m = print (t, m)
dataSync = local . async $ do
mc <- runClient mqttConfig{_hostname="47.112.196.170", _port=1883, _connID="ptm-data-sync",
-- _cleanSession=False,
_lwt=Just $ mkLWT "ptm/status/ptm-data-sync" "bye for now" False
, _msgCB = Just syncData}
print =<< subscribe mc [("ptm/site/+/datasync/#", QoS2)]
print =<< waitForClient mc
disconnect mc
where
syncData _ t m = print (t, m)
ptmApi = api $ gets <|> posts <|> badRequest
where
posts= do
received POST
received ("site" :: String)
site' <- param
let servicePre = locateServicePrefix site'
station servicePre <|> team servicePre <|> user servicePre <|> storeRoom servicePre <|> workType servicePre <|> task servicePre <|> toolClass servicePre <|> toolDefect servicePre <|> tool servicePre <|> operateTool servicePre
station servicePre= do
received ("station" :: String)
postParams <- param :: TransIO PostParams
let postJsonReq = postParaToJson postParams
do
pRes <- runCloud $ (callService (postRestService "47.112.196.170" "9001" "station" (length postJsonReq) postJsonReq) ())
case pRes of
OK -> return $ BS.pack $ "HTTP/1.0 200 OK\nContent-Type: text/plain\nContent-Length: 0"
++ "\nConnection: close\n\n"
ErrorPost _ -> return $ BS.pack $ "HTTP/1.0 200 OK\nContent-Type: text/plain\nContent-Length: "++ show (length ("asdfsadf" :: String))
++ "\nConnection: close\n\n" ++ ("asdfasdf" :: String)
liftIO $ do
mc <- runClient mqttConfig{_hostname="47.112.196.170", _port=1883, _connID="hasqttl",
-- _cleanSession=False,
_lwt=Just $ mkLWT "tmp/haskquit" "bye for now" False}
publishq mc (servicePre <> "/station/create" :: Text) (BS.pack postJsonReq) False QoS2
disconnect mc
return $ BS.pack $ "HTTP/1.0 200 OK\nContent-Type: text/plain\nContent-Length: 0"
++ "\nConnection: close\n\n"
team servicePre = do
received ("team" :: String)
received ("id" :: String)
sId <- param
postParams <- param
liftIO $ print (sId :: Int)
liftIO $ print (postParams :: PostParams)
let msg= "received\n"
return $ BS.pack $ "HTTP/1.0 200 OK\nContent-Type: text/plain\nContent-Length: "++ show (length msg)
++ "\nConnection: close\n\n" ++ msg
user servicePre = do
received ("user" :: String)
received ("id" :: String)
sId <- param
postParams <- param
liftIO $ print (sId :: Int)
liftIO $ print (postParams :: PostParams)
let msg= "received\n"
return $ BS.pack $ "HTTP/1.0 200 OK\nContent-Type: text/plain\nContent-Length: "++ show (length msg)
++ "\nConnection: close\n\n" ++ msg
storeRoom servicePre = do
received ("storeroom" :: String)
received ("id" :: String)
sId <- param
postParams <- param
liftIO $ print (sId :: Int)
liftIO $ print (postParams :: PostParams)
let msg= "received\n"
return $ BS.pack $ "HTTP/1.0 200 OK\nContent-Type: text/plain\nContent-Length: "++ show (length msg)
++ "\nConnection: close\n\n" ++ msg
workType servicePre = do
received ("worktype" :: String)
received ("id" :: String)
sId <- param
postParams <- param
liftIO $ print (sId :: Int)
liftIO $ print (postParams :: PostParams)
let msg= "received\n"
return $ BS.pack $ "HTTP/1.0 200 OK\nContent-Type: text/plain\nContent-Length: "++ show (length msg)
++ "\nConnection: close\n\n" ++ msg
task servicePre = do
received ("task" :: String)
received ("id" :: String)
sId <- param
postParams <- param
liftIO $ print (sId :: Int)
liftIO $ print (postParams :: PostParams)
let msg= "received\n"
return $ BS.pack $ "HTTP/1.0 200 OK\nContent-Type: text/plain\nContent-Length: "++ show (length msg)
++ "\nConnection: close\n\n" ++ msg
toolClass servicePre = do
received ("toolclass" :: String)
received ("id" :: String)
sId <- param
postParams <- param
liftIO $ print (sId :: Int)
liftIO $ print (postParams :: PostParams)
let msg= "received\n"
return $ BS.pack $ "HTTP/1.0 200 OK\nContent-Type: text/plain\nContent-Length: "++ show (length msg)
++ "\nConnection: close\n\n" ++ msg
toolDefect servicePre = do
received ("tooldefect" :: String)
received ("id" :: String)
sId <- param
postParams <- param
liftIO $ print (sId :: Int)
liftIO $ print (postParams :: PostParams)
let msg= "received\n"
return $ BS.pack $ "HTTP/1.0 200 OK\nContent-Type: text/plain\nContent-Length: "++ show (length msg)
++ "\nConnection: close\n\n" ++ msg
tool servicePre = do
received ("tool" :: String)
received ("id" :: String)
sId <- param
postParams <- param
liftIO $ print (sId :: Int)
liftIO $ print (postParams :: PostParams)
let msg= "received\n"
return $ BS.pack $ "HTTP/1.0 200 OK\nContent-Type: text/plain\nContent-Length: "++ show (length msg)
++ "\nConnection: close\n\n" ++ msg
operateTool servicePre = do
received ("operatetool" :: String)
received ("id" :: String)
sId <- param
postParams <- param
liftIO $ print (sId :: Int)
liftIO $ print (postParams :: PostParams)
let msg= "received\n"
return $ BS.pack $ "HTTP/1.0 200 OK\nContent-Type: text/plain\nContent-Length: "++ show (length msg)
++ "\nConnection: close\n\n" ++ msg
gets= do
received GET
hello <|> hellostream
hello= do
received ("hello" :: String)
name <- param
let msg= "hello " ++ name ++ "\n"
len= length msg
return $ BS.pack $ "HTTP/1.0 200 OK\nContent-Type: text/plain\nContent-Length: "++ show len
++ "\nConnection: close\n\n" ++ msg
hellostream = do
received ("hellos" :: String)
name <- param
header <|> stream name
where
header=async $ return $ BS.pack $
"HTTP/1.0 200 OK\nContent-Type: text/plain\nConnection: close\n\n"++
"here follows a stream\n"
stream name= do
i <- threads 0 $ choose [1 ..]
liftIO $ threadDelay 100000
return . BS.pack $ " hello " ++ name ++ " "++ show i
badRequest = return $ BS.pack $
let resp="Bad Request\n\
\Usage: GET: host/port/api/hello/<name>, host/port/api/hellos/<name>\n\
\ POST: host/port/api\n"
in "HTTP/1.0 400 Bad Request\nContent-Length: " ++ show(length resp)
++"\nConnection: close\n\n"++ resp
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment