Created
May 3, 2021 14:59
-
-
Save zeebo/ae635824205088a51031c09cb07ea553 to your computer and use it in GitHub Desktop.
websocket demo code
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package drpcws | |
import ( | |
"context" | |
"net/http" | |
"sync" | |
"github.com/gobwas/ws" | |
"github.com/gobwas/ws/wsutil" | |
"github.com/zeebo/errs" | |
"storj.io/drpc" | |
"storj.io/drpc/drpchttp" | |
) | |
func NewGobwasConn(dialer ws.Dialer, url string) drpc.Conn { | |
return &gobwasConn{dialer: dialer, url: url} | |
} | |
// TODO(jeff): Invoke and NewStream dial a new websocket each time. there are many | |
// ways we might want to do something different, but this is fine for now. the | |
// path this code takes does have the unfortunate side effect of making the | |
// Close, Closed and Transport methods no-ops. | |
type gobwasConn struct { | |
dialer ws.Dialer | |
url string | |
} | |
func (c *gobwasConn) Close() error { return nil } | |
func (c *gobwasConn) Closed() bool { return false } | |
func (c *gobwasConn) Transport() drpc.Transport { return nil } | |
func (c *gobwasConn) Invoke(ctx context.Context, rpc string, enc drpc.Encoding, in drpc.Message, out drpc.Message) error { | |
stream, err := c.NewStream(ctx, rpc, enc) | |
if err != nil { | |
return err | |
} | |
defer func() { _ = stream.Close() }() | |
if err := stream.MsgSend(in, enc); err != nil { | |
return err | |
} | |
if err := stream.MsgRecv(out, enc); err != nil { | |
return err | |
} | |
return nil | |
} | |
// NewStream starts a stream with the remote. Only one Invoke or Stream may be | |
// open at once. | |
func (c *gobwasConn) NewStream(ctx context.Context, rpc string, enc drpc.Encoding) (drpc.Stream, error) { | |
// TODO(jeff): support sending application/json as the content-type | |
// TODO: parse + add rpc correctly | |
conn, _, _, err := c.dialer.Dial(ctx, c.url+rpc) | |
if err != nil { | |
return nil, err | |
} | |
return newGobwasStream(ctx, conn, false, true), nil | |
} | |
func NewGobwas(handler drpc.Handler) http.Handler { | |
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { | |
ctx, err := drpchttp.Context(req) | |
if err != nil { | |
http.Error(rw, err.Error(), http.StatusInternalServerError) | |
return | |
} | |
conn, _, _, err := ws.UpgradeHTTP(req, rw) | |
if err != nil { | |
http.Error(rw, err.Error(), http.StatusBadRequest) | |
return | |
} | |
defer func() { _ = conn.Close() }() | |
stream := newGobwasStream(ctx, conn, req.Header.Get("Content-Type") == "application/json", false) | |
defer func() { _ = stream.Close() }() | |
if err := handler.HandleRPC(stream, req.URL.Path); err != nil { | |
// TODO(jeff): how to signal this error back after we've hijacked? | |
// json envelope? what about if protobuf? | |
_ = err | |
} | |
}) | |
} | |
// TODO(jeff): gobwasStream probably needs to see what kinds of errors happen in | |
// half closed situations, and return the appropriate errors back to callers | |
type gobwasStream struct { | |
mu sync.Mutex | |
ctx context.Context | |
tr drpc.Transport | |
json bool | |
client bool | |
} | |
func newGobwasStream(ctx context.Context, tr drpc.Transport, json, client bool) *gobwasStream { | |
return &gobwasStream{ | |
ctx: ctx, | |
tr: tr, | |
json: json, | |
client: client, | |
} | |
} | |
func (w *gobwasStream) Context() context.Context { return w.ctx } | |
func (w *gobwasStream) MsgSend(msg drpc.Message, enc drpc.Encoding) (err error) { | |
w.mu.Lock() | |
defer w.mu.Unlock() | |
var data []byte | |
if w.json { | |
data, err = drpchttp.JSONMarshal(msg, enc) | |
} else { | |
data, err = enc.Marshal(msg) | |
} | |
if err != nil { | |
return err | |
} | |
if w.client { | |
return wsutil.WriteClientBinary(w.tr, data) | |
} else { | |
return wsutil.WriteServerBinary(w.tr, data) | |
} | |
} | |
func (w *gobwasStream) MsgRecv(msg drpc.Message, enc drpc.Encoding) (err error) { | |
w.mu.Lock() | |
defer w.mu.Unlock() | |
var data []byte | |
if w.client { | |
data, err = wsutil.ReadServerBinary(w.tr) | |
} else { | |
data, err = wsutil.ReadClientBinary(w.tr) | |
} | |
if err != nil { | |
return err | |
} | |
if w.json { | |
return drpchttp.JSONUnmarshal(data, msg, enc) | |
} else { | |
return enc.Unmarshal(data, msg) | |
} | |
} | |
func (w *gobwasStream) CloseSend() error { | |
w.mu.Lock() | |
defer w.mu.Unlock() | |
return ws.WriteFrame(w.tr, ws.NewCloseFrame(nil)) | |
} | |
func (w *gobwasStream) Close() error { | |
w.mu.Lock() | |
defer w.mu.Unlock() | |
return errs.Combine(ws.WriteFrame(w.tr, ws.NewCloseFrame(nil)), w.tr.Close()) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package drpcws | |
import ( | |
"context" | |
"net/http" | |
"sync" | |
"time" | |
"github.com/gorilla/websocket" | |
"storj.io/drpc" | |
"storj.io/drpc/drpchttp" | |
) | |
func NewGorilla(handler drpc.Handler) http.Handler { | |
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { | |
ctx, err := drpchttp.Context(req) | |
if err != nil { | |
http.Error(rw, err.Error(), http.StatusInternalServerError) | |
return | |
} | |
var upgrader = websocket.Upgrader{ | |
ReadBufferSize: 1024, | |
WriteBufferSize: 1024, | |
} | |
conn, err := upgrader.Upgrade(rw, req, nil) | |
if err != nil { | |
return | |
} | |
defer func() { _ = conn.Close() }() | |
stream := newGorillaStream(ctx, conn, req.Header.Get("Content-Type") == "application/json") | |
defer func() { _ = stream.Close() }() | |
if err := handler.HandleRPC(stream, req.URL.Path); err != nil { | |
// TODO(jeff): how to signal this error back after we've hijacked? | |
// json envelope? what about if protobuf? | |
_ = err | |
} | |
}) | |
} | |
// TODO(jeff): gorillaStream probably needs to see what kinds of errors happen in | |
// half closed situations, and return the appropriate errors back to callers | |
type gorillaStream struct { | |
mu sync.Mutex | |
ctx context.Context | |
conn *websocket.Conn | |
json bool | |
} | |
func newGorillaStream(ctx context.Context, conn *websocket.Conn, json bool) *gorillaStream { | |
return &gorillaStream{ | |
ctx: ctx, | |
conn: conn, | |
json: json, | |
} | |
} | |
func (w *gorillaStream) Context() context.Context { return w.ctx } | |
func (w *gorillaStream) MsgSend(msg drpc.Message, enc drpc.Encoding) (err error) { | |
w.mu.Lock() | |
defer w.mu.Unlock() | |
var data []byte | |
if w.json { | |
data, err = drpchttp.JSONMarshal(msg, enc) | |
} else { | |
data, err = enc.Marshal(msg) | |
} | |
if err != nil { | |
return err | |
} | |
return w.conn.WriteMessage(websocket.BinaryMessage, data) | |
} | |
func (w *gorillaStream) MsgRecv(msg drpc.Message, enc drpc.Encoding) (err error) { | |
w.mu.Lock() | |
defer w.mu.Unlock() | |
_, data, err := w.conn.ReadMessage() | |
if err != nil { | |
return err | |
} | |
if w.json { | |
return drpchttp.JSONUnmarshal(data, msg, enc) | |
} else { | |
return enc.Unmarshal(data, msg) | |
} | |
} | |
func (w *gorillaStream) CloseSend() error { | |
w.mu.Lock() | |
defer w.mu.Unlock() | |
return w.conn.WriteControl(websocket.CloseMessage, nil, time.Now().Add(time.Minute)) | |
} | |
func (w *gorillaStream) Close() error { | |
w.mu.Lock() | |
defer w.mu.Unlock() | |
return w.conn.WriteControl(websocket.CloseMessage, nil, time.Now().Add(time.Minute)) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment