Skip to content

Instantly share code, notes, and snippets.

@Fgabz
Last active July 30, 2020 09:28
Show Gist options
  • Save Fgabz/751ef0b4cc7b5ce3dfa0310cded74dd0 to your computer and use it in GitHub Desktop.
Save Fgabz/751ef0b4cc7b5ce3dfa0310cded74dd0 to your computer and use it in GitHub Desktop.
TransportLayer for making AppSync subscriptions working with Android Apollo client
import android.net.Uri
import android.util.Base64
import com.aircall.service.graphql.model.RequestGrahpQL
import com.apollographql.apollo.api.internal.Utils.__checkNotNull
import com.apollographql.apollo.subscription.OperationClientMessage
import com.apollographql.apollo.subscription.OperationServerMessage
import com.apollographql.apollo.subscription.SubscriptionTransport
import com.google.gson.Gson
import okhttp3.Request
import okhttp3.Response
import okhttp3.WebSocket
import org.json.JSONObject
import java.lang.ref.WeakReference
import java.util.concurrent.atomic.AtomicReference
//More info https://github.com/apollographql/apollo-android/issues/1864
//https://github.com/awslabs/aws-mobile-appsync-sdk-android/blob/main/aws-android-sdk-appsync/src/main/java/com/amazonaws/mobileconnectors/appsync/WebSocketConnectionManager.java
class AppSyncWebSocketTransportLayer constructor(
private val authKey: String,
private val host: String,
private val webSocketRequest: Request?,
private val webSocketConnectionFactory: WebSocket.Factory?,
private val callback: SubscriptionTransport.Callback?
) : SubscriptionTransport {
private val webSocket =
AtomicReference<WebSocket?>()
private val webSocketListener =
AtomicReference<WebSocketListener?>()
override fun connect() {
val webSocketListener = WebSocketListener(this)
check(this.webSocketListener.compareAndSet(null, webSocketListener)) { "Already connected" }
webSocket.set(webSocketConnectionFactory!!.newWebSocket(webSocketRequest!!, webSocketListener))
}
override fun disconnect(message: OperationClientMessage) {
val socket = webSocket.getAndSet(null)
socket?.close(DISCONNECT_STATUS_CODE, message.toJsonString())
release()
}
override fun send(message: OperationClientMessage) {
val socket = webSocket.get() ?: throw IllegalStateException("Not connected")
val grahpQLRequest = Gson().fromJson(message.toJsonString(), RequestGrahpQL::class.java)
when (grahpQLRequest.type) {
SendMessageType.CONNECTION_INIT.value -> {
socket.send(message.toJsonString())
}
SendMessageType.STOP.value -> {
socket.send(
JSONObject()
.put("type", "stop")
.put("id", grahpQLRequest.id)
.toString()
)
}
else -> {
val data = (JSONObject()
.put("query", grahpQLRequest.payload?.query)
.put("variables", JSONObject(grahpQLRequest.payload?.variables))).toString()
val extensionField = JSONObject()
.put(
"authorization",
JSONObject()
.put("host", host)
.put("Authorization", authKey)
)
socket.send(
JSONObject()
.put("id", grahpQLRequest.id)
.put("type", grahpQLRequest.type)
.put(
"payload", JSONObject()
.put("data", data)
.put("extensions", extensionField)
)
.toString()
)
}
}
}
fun onOpen() {
checkNotNull(callback).onConnected()
}
fun onMessage(message: OperationServerMessage?) {
checkNotNull(callback).onMessage(message)
}
fun onFailure(t: Throwable?) {
try {
checkNotNull(callback).onFailure(t)
} finally {
release()
}
}
fun onClosed() {
try {
checkNotNull(callback).onClosed()
} finally {
release()
}
}
fun release() {
val socketListener = webSocketListener.getAndSet(null)
socketListener?.release()
webSocket.set(null)
}
class WebSocketListener(delegate: AppSyncWebSocketTransportLayer) : okhttp3.WebSocketListener() {
val delegateRef: WeakReference<AppSyncWebSocketTransportLayer> = WeakReference(delegate)
override fun onOpen(webSocket: WebSocket, response: Response) {
val delegate = delegateRef.get()
delegate?.onOpen()
}
override fun onMessage(webSocket: WebSocket, text: String) {
val delegate = delegateRef.get()
if (delegate != null) {
val message = OperationServerMessage.fromJsonString(text)
delegate.onMessage(message)
}
}
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
val delegate = delegateRef.get()
delegate?.onFailure(t)
}
override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
val delegate = delegateRef.get()
delegate?.onClosed()
}
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
val delegate = delegateRef.get()
delegate?.onClosed()
}
fun release() {
delegateRef.clear()
}
}
class Factory(serverUrl: String, auth: String, webSocketConnectionFactory: WebSocket.Factory) :
SubscriptionTransport.Factory {
private val webSocketRequest: Request
private val webSocketConnectionFactory: WebSocket.Factory
private val authKey: String = auth
private val host: String = serverUrl.split("/")[2]
override fun create(callback: SubscriptionTransport.Callback): SubscriptionTransport {
__checkNotNull(callback, "callback == null")
return AppSyncWebSocketTransportLayer(
authKey,
host,
webSocketRequest,
webSocketConnectionFactory,
callback
)
}
init {
val header = JSONObject()
.put("host", host)
.put("Authorization", auth)
val finalUrl = Uri.Builder()
.encodedPath(
serverUrl
.replace("appsync-api", "appsync-realtime-api")
.replace("https", "wss")
)
.appendQueryParameter("header", Base64.encodeToString(header.toString().toByteArray(), Base64.DEFAULT))
.appendQueryParameter("payload", "e30=")
.build()
.toString()
this.webSocketRequest = Request.Builder()
.url(finalUrl)
.addHeader("Sec-WebSocket-Protocol", "graphql-ws")
.build()
this.webSocketConnectionFactory =
__checkNotNull(webSocketConnectionFactory, "webSocketConnectionFactory == null")
}
}
enum class SendMessageType(val value: String) {
CONNECTION_INIT("connection_init"),
STOP("stop")
}
companion object {
const val DISCONNECT_STATUS_CODE = 1001
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment