Last active
July 30, 2020 09:28
-
-
Save Fgabz/751ef0b4cc7b5ce3dfa0310cded74dd0 to your computer and use it in GitHub Desktop.
TransportLayer for making AppSync subscriptions working with Android Apollo client
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import android.net.Uri | |
import android.util.Base64 | |
import com.aircall.service.graphql.model.RequestGrahpQL | |
import com.apollographql.apollo.api.internal.Utils.__checkNotNull | |
import com.apollographql.apollo.subscription.OperationClientMessage | |
import com.apollographql.apollo.subscription.OperationServerMessage | |
import com.apollographql.apollo.subscription.SubscriptionTransport | |
import com.google.gson.Gson | |
import okhttp3.Request | |
import okhttp3.Response | |
import okhttp3.WebSocket | |
import org.json.JSONObject | |
import java.lang.ref.WeakReference | |
import java.util.concurrent.atomic.AtomicReference | |
//More info https://github.com/apollographql/apollo-android/issues/1864 | |
//https://github.com/awslabs/aws-mobile-appsync-sdk-android/blob/main/aws-android-sdk-appsync/src/main/java/com/amazonaws/mobileconnectors/appsync/WebSocketConnectionManager.java | |
class AppSyncWebSocketTransportLayer constructor( | |
private val authKey: String, | |
private val host: String, | |
private val webSocketRequest: Request?, | |
private val webSocketConnectionFactory: WebSocket.Factory?, | |
private val callback: SubscriptionTransport.Callback? | |
) : SubscriptionTransport { | |
private val webSocket = | |
AtomicReference<WebSocket?>() | |
private val webSocketListener = | |
AtomicReference<WebSocketListener?>() | |
override fun connect() { | |
val webSocketListener = WebSocketListener(this) | |
check(this.webSocketListener.compareAndSet(null, webSocketListener)) { "Already connected" } | |
webSocket.set(webSocketConnectionFactory!!.newWebSocket(webSocketRequest!!, webSocketListener)) | |
} | |
override fun disconnect(message: OperationClientMessage) { | |
val socket = webSocket.getAndSet(null) | |
socket?.close(DISCONNECT_STATUS_CODE, message.toJsonString()) | |
release() | |
} | |
override fun send(message: OperationClientMessage) { | |
val socket = webSocket.get() ?: throw IllegalStateException("Not connected") | |
val grahpQLRequest = Gson().fromJson(message.toJsonString(), RequestGrahpQL::class.java) | |
when (grahpQLRequest.type) { | |
SendMessageType.CONNECTION_INIT.value -> { | |
socket.send(message.toJsonString()) | |
} | |
SendMessageType.STOP.value -> { | |
socket.send( | |
JSONObject() | |
.put("type", "stop") | |
.put("id", grahpQLRequest.id) | |
.toString() | |
) | |
} | |
else -> { | |
val data = (JSONObject() | |
.put("query", grahpQLRequest.payload?.query) | |
.put("variables", JSONObject(grahpQLRequest.payload?.variables))).toString() | |
val extensionField = JSONObject() | |
.put( | |
"authorization", | |
JSONObject() | |
.put("host", host) | |
.put("Authorization", authKey) | |
) | |
socket.send( | |
JSONObject() | |
.put("id", grahpQLRequest.id) | |
.put("type", grahpQLRequest.type) | |
.put( | |
"payload", JSONObject() | |
.put("data", data) | |
.put("extensions", extensionField) | |
) | |
.toString() | |
) | |
} | |
} | |
} | |
fun onOpen() { | |
checkNotNull(callback).onConnected() | |
} | |
fun onMessage(message: OperationServerMessage?) { | |
checkNotNull(callback).onMessage(message) | |
} | |
fun onFailure(t: Throwable?) { | |
try { | |
checkNotNull(callback).onFailure(t) | |
} finally { | |
release() | |
} | |
} | |
fun onClosed() { | |
try { | |
checkNotNull(callback).onClosed() | |
} finally { | |
release() | |
} | |
} | |
fun release() { | |
val socketListener = webSocketListener.getAndSet(null) | |
socketListener?.release() | |
webSocket.set(null) | |
} | |
class WebSocketListener(delegate: AppSyncWebSocketTransportLayer) : okhttp3.WebSocketListener() { | |
val delegateRef: WeakReference<AppSyncWebSocketTransportLayer> = WeakReference(delegate) | |
override fun onOpen(webSocket: WebSocket, response: Response) { | |
val delegate = delegateRef.get() | |
delegate?.onOpen() | |
} | |
override fun onMessage(webSocket: WebSocket, text: String) { | |
val delegate = delegateRef.get() | |
if (delegate != null) { | |
val message = OperationServerMessage.fromJsonString(text) | |
delegate.onMessage(message) | |
} | |
} | |
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) { | |
val delegate = delegateRef.get() | |
delegate?.onFailure(t) | |
} | |
override fun onClosing(webSocket: WebSocket, code: Int, reason: String) { | |
val delegate = delegateRef.get() | |
delegate?.onClosed() | |
} | |
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) { | |
val delegate = delegateRef.get() | |
delegate?.onClosed() | |
} | |
fun release() { | |
delegateRef.clear() | |
} | |
} | |
class Factory(serverUrl: String, auth: String, webSocketConnectionFactory: WebSocket.Factory) : | |
SubscriptionTransport.Factory { | |
private val webSocketRequest: Request | |
private val webSocketConnectionFactory: WebSocket.Factory | |
private val authKey: String = auth | |
private val host: String = serverUrl.split("/")[2] | |
override fun create(callback: SubscriptionTransport.Callback): SubscriptionTransport { | |
__checkNotNull(callback, "callback == null") | |
return AppSyncWebSocketTransportLayer( | |
authKey, | |
host, | |
webSocketRequest, | |
webSocketConnectionFactory, | |
callback | |
) | |
} | |
init { | |
val header = JSONObject() | |
.put("host", host) | |
.put("Authorization", auth) | |
val finalUrl = Uri.Builder() | |
.encodedPath( | |
serverUrl | |
.replace("appsync-api", "appsync-realtime-api") | |
.replace("https", "wss") | |
) | |
.appendQueryParameter("header", Base64.encodeToString(header.toString().toByteArray(), Base64.DEFAULT)) | |
.appendQueryParameter("payload", "e30=") | |
.build() | |
.toString() | |
this.webSocketRequest = Request.Builder() | |
.url(finalUrl) | |
.addHeader("Sec-WebSocket-Protocol", "graphql-ws") | |
.build() | |
this.webSocketConnectionFactory = | |
__checkNotNull(webSocketConnectionFactory, "webSocketConnectionFactory == null") | |
} | |
} | |
enum class SendMessageType(val value: String) { | |
CONNECTION_INIT("connection_init"), | |
STOP("stop") | |
} | |
companion object { | |
const val DISCONNECT_STATUS_CODE = 1001 | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment