Skip to content

Instantly share code, notes, and snippets.

@Partyschaum
Created August 18, 2020 13:51
Show Gist options
  • Save Partyschaum/ad03a973dd689d7b6f7320ce4015a9f9 to your computer and use it in GitHub Desktop.
Save Partyschaum/ad03a973dd689d7b6f7320ce4015a9f9 to your computer and use it in GitHub Desktop.
class THttpClientOfferingRawRequest : TTransport {
private var url: URL? = null
private val requestBuffer = ByteArrayOutputStream()
private var inputStream: InputStream? = null
private var connectTimeout = 0
private var readTimeout = 0
private var customHeaders: MutableMap<String, String>? = null
private var host: HttpHost? = null
private var client: HttpClient? = null
lateinit var data: ByteArray
fun rawRequest(): String = String(data)
class Factory : TTransportFactory {
private val url: String
private val client: HttpClient?
constructor(url: String) {
this.url = url
this.client = null
}
constructor(url: String, client: HttpClient?) {
this.url = url
this.client = client
}
override fun getTransport(trans: TTransport): TTransport? {
return try {
if (null != client) {
org.apache.thrift.transport.THttpClient(url, client)
} else {
org.apache.thrift.transport.THttpClient(url)
}
} catch (tte: TTransportException) {
null
}
}
}
constructor(url: String?) {
try {
this.url = URL(url)
this.client = null
host = null
} catch (iox: IOException) {
throw TTransportException(iox)
}
}
constructor(url: String?, client: HttpClient?) {
try {
this.url = URL(url)
this.client = client
host = HttpHost(
this.url!!.host,
if (-1 == this.url!!.port) this.url!!.defaultPort else this.url!!.port,
this.url!!.protocol
)
} catch (iox: IOException) {
throw TTransportException(iox)
}
}
fun setConnectTimeout(timeout: Int) {
connectTimeout = timeout
// WARNING, this modifies the HttpClient params, this might have an impact elsewhere if the
// same HttpClient is used for something else.
this.client?.params?.setParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, connectTimeout)
}
fun setReadTimeout(timeout: Int) {
readTimeout = timeout
client?.params?.setParameter(CoreConnectionPNames.SO_TIMEOUT, readTimeout)
}
fun setCustomHeaders(headers: MutableMap<String, String>?) {
customHeaders = headers
}
fun setCustomHeader(key: String, value: String) {
if (customHeaders == null) {
customHeaders = HashMap()
}
customHeaders!![key] = value
}
override fun open() {}
override fun close() {
if (null != inputStream) {
try {
inputStream!!.close()
} catch (ioe: IOException) {
}
inputStream = null
}
}
override fun isOpen(): Boolean {
return true
}
@Throws(TTransportException::class)
override fun read(buf: ByteArray, off: Int, len: Int): Int {
if (inputStream == null) {
throw TTransportException("Response buffer is empty, no request.")
}
return try {
val ret = inputStream!!.read(buf, off, len)
if (ret == -1) {
throw TTransportException("No more data available.")
}
ret
} catch (iox: IOException) {
throw TTransportException(iox)
}
}
override fun write(buf: ByteArray, off: Int, len: Int) {
requestBuffer.write(buf, off, len)
}
@Throws(TTransportException::class)
private fun flushUsingHttpClient() {
if (null == this.client) {
throw TTransportException("Null HttpClient, aborting.")
}
// Extract request and reset buffer
val data = requestBuffer.toByteArray()
requestBuffer.reset()
var post: HttpPost? = null
var `is`: InputStream? = null
try {
// Set request to path + query string
post = HttpPost(url!!.file)
//
// Headers are added to the HttpPost instance, not
// to HttpClient.
//
post.setHeader("Content-Type", "application/x-thrift")
post.setHeader("Accept", "application/x-thrift")
post.setHeader("User-Agent", "Java/THttpClient/HC")
if (null != customHeaders) {
for ((key, value) in customHeaders!!) {
post.setHeader(key, value)
}
}
post.entity = ByteArrayEntity(data)
val response = this.client!!.execute(host, post)
val responseCode = response.statusLine.statusCode
//
// Retrieve the inputstream BEFORE checking the status code so
// resources get freed in the finally clause.
//
`is` = response.entity.content
if (responseCode != HttpStatus.SC_OK) {
throw TTransportException("HTTP Response code: $responseCode")
}
// Read the responses into a byte array so we can release the connection
// early. This implies that the whole content will have to be read in
// memory, and that momentarily we might use up twice the memory (while the
// thrift struct is being read up the chain).
// Proceeding differently might lead to exhaustion of connections and thus
// to app failure.
val buf = ByteArray(1024)
val baos = ByteArrayOutputStream()
var len = 0
do {
len = `is`.read(buf)
if (len > 0) {
baos.write(buf, 0, len)
}
} while (-1 != len)
try {
// Indicate we're done with the content.
consume(response.entity)
} catch (ioe: IOException) {
// We ignore this exception, it might only mean the server has no
// keep-alive capability.
}
inputStream = ByteArrayInputStream(baos.toByteArray())
} catch (ioe: IOException) {
// Abort method so the connection gets released back to the connection manager
post?.abort()
throw TTransportException(ioe)
} finally {
if (null != `is`) {
// Close the entity's input stream, this will release the underlying connection
try {
`is`.close()
} catch (ioe: IOException) {
throw TTransportException(ioe)
}
}
post?.releaseConnection()
}
}
@Throws(TTransportException::class)
override fun flush() {
if (null != this.client) {
flushUsingHttpClient()
return
}
// Extract request and reset buffer
data = requestBuffer.toByteArray()
requestBuffer.reset()
try {
// Create connection object
val connection = url!!.openConnection() as HttpURLConnection
// Timeouts, only if explicitly set
if (connectTimeout > 0) {
connection.connectTimeout = connectTimeout
}
if (readTimeout > 0) {
connection.readTimeout = readTimeout
}
// Make the request
connection.requestMethod = "POST"
connection.setRequestProperty("Content-Type", "application/x-thrift")
connection.setRequestProperty("Accept", "application/x-thrift")
connection.setRequestProperty("User-Agent", "Java/THttpClient")
if (customHeaders != null) {
for ((key, value) in customHeaders!!) {
connection.setRequestProperty(key, value)
}
}
connection.doOutput = true
connection.connect()
connection.outputStream.write(data)
val responseCode = connection.responseCode
if (responseCode != HttpURLConnection.HTTP_OK) {
throw TTransportException("HTTP Response code: $responseCode")
}
// Read the responses
inputStream = connection.inputStream
} catch (iox: IOException) {
throw TTransportException(iox)
}
}
companion object {
/**
* copy from org.apache.http.util.EntityUtils#consume. Android has it's own httpcore
* that doesn't have a consume.
*/
@Throws(IOException::class)
private fun consume(entity: HttpEntity?) {
if (entity == null) {
return
}
if (entity.isStreaming) {
val instream = entity.content
instream?.close()
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment