Skip to content

Instantly share code, notes, and snippets.

@ariankordi
Created September 14, 2024 04:19
Show Gist options
  • Save ariankordi/0b990239daa1f69d571c7de3bec57cc4 to your computer and use it in GitHub Desktop.
Save ariankordi/0b990239daa1f69d571c7de3bec57cc4 to your computer and use it in GitHub Desktop.
Scripts to assist in deobfuscating Miitomo/DeNA XOR + LZ4 encryption, with a mitmproxy script, Go reverse proxy, and Python decoding script. (Note: AI slop)
import mitmproxy.http
from mitmproxy import ctx
import lz4.block
import binascii
# NOTE: miitomo common key is '9ec1c78fa2cb34e2bed5691c08432f04'
COMMON_KEY = "9ec1c78fa2cb34e2bed5691c08432f04"
SESSION_ID_COOKIE_NAME = "player_session_id"
def transform_common_key(s):
"""
Transforms the common key by subtracting 0x62 from each character
and negating the result, similar to FUN_0004a120 in libsakasho.so.
Args:
s (str): Input common key string.
Returns:
bytes: Processed bytes.
"""
return bytes([(-0x62 - ord(c)) & 0xFF for c in s])
def build_xor_table(common_key, session_id):
"""
Build the XOR table by processing the common key and session ID
and concatenating the results.
Args:
common_key (str): The common key string.
session_id (str): The session ID string.
Returns:
bytes: The XOR table.
"""
transformed_common = transform_common_key(common_key)
return transformed_common + session_id.encode('ascii')
def conditional_xor(data, xor_table, encode=False):
"""
Apply the conditional XOR operation to the data using the XOR table.
For each byte in the data:
- If (key_byte & 7) == 0, XOR the data byte with the key byte.
- Else, perform a bit rotation based on (key_byte & 7).
Args:
data (bytes): The obfuscated or plain data.
xor_table (bytes): The XOR table.
encode (bool): If True, perform encoding; if False, perform decoding.
Returns:
bytes: The data after applying the XOR operation.
"""
output = bytearray(len(data))
table_len = len(xor_table)
for i in range(len(data)):
key_byte = xor_table[(i + 1) % table_len]
if (key_byte & 7) == 0:
# Perform XOR
output[i] = data[i] ^ key_byte
else:
# Perform bit rotation
shift = key_byte & 7
if encode:
rotated = ((data[i] << (8 - shift)) | (data[i] >> shift)) & 0xFF
else:
rotated = ((data[i] >> (8 - shift)) | (data[i] << shift)) & 0xFF
output[i] = rotated
return bytes(output)
def decode_varint(data):
"""
Decode a varint from the beginning of the data.
Args:
data (bytes): The data containing the varint at the start.
Returns:
tuple: (decoded integer, number of bytes consumed)
Raises:
ValueError: If the varint is too long or incomplete.
"""
value = 0
shift = 0
for i, byte in enumerate(data):
value |= (byte & 0x7F) << shift
if (byte & 0x80) == 0:
return value, i + 1
shift += 7
if shift >= 35:
raise ValueError("Varint too long")
raise ValueError("Incomplete varint")
def encode_varint(value):
"""
Encode an integer into varint format.
Args:
value (int): The integer to encode.
Returns:
bytes: The varint-encoded bytes.
"""
parts = []
while True:
byte = value & 0x7F
value >>= 7
if value:
parts.append(byte | 0x80)
else:
parts.append(byte)
break
return bytes(parts)
class DenaObfuscation:
def __init__(self):
# Constant common key
self.common_key = COMMON_KEY
def get_session_id(self, flow: mitmproxy.http.HTTPFlow):
"""
Retrieve the session ID from cookies.
It first checks the response cookies, then the request cookies.
If not found, returns None.
Args:
flow (mitmproxy.http.HTTPFlow): The HTTP flow.
Returns:
str or None: The session ID if found, else None.
"""
# Check response cookies
if flow.response:
cookies = flow.response.cookies.get(SESSION_ID_COOKIE_NAME)
if cookies:
return cookies[0] if isinstance(cookies, tuple) else cookies
# Check request cookies
if flow.request:
cookies = flow.request.cookies.get(SESSION_ID_COOKIE_NAME)
if cookies:
return cookies[0] if isinstance(cookies, tuple) else cookies
return None
def should_process_flow(self, flow: mitmproxy.http.HTTPFlow) -> bool:
"""
Determine whether to process the flow based on specific criteria.
This function checks if the User-Agent contains "SakashoClient".
Args:
flow (mitmproxy.http.HTTPFlow): The HTTP flow.
Returns:
bool: True if the flow should be processed, otherwise False.
"""
# Check if User-Agent contains "SakashoClient"
user_agent = flow.request.headers.get("User-Agent", "")
if "SakashoClient" not in user_agent:
return False
# Skip /v1/session specifically
if flow.request.path == "/v1/session":
return False
return True
def request(self, flow: mitmproxy.http.HTTPFlow):
"""
Handle the HTTP request.
Encode the request body if it starts with "{".
"""
session_id = self.get_session_id(flow)
if not session_id or not self.should_process_flow(flow):
return
# Build XOR table
xor_table = build_xor_table(self.common_key, session_id)
if not xor_table:
ctx.log.error("XOR table is empty. Skipping flow.")
return
# Store XOR table in flow metadata
flow.metadata['xor_table'] = xor_table
if flow.request.content and flow.request.content.startswith(b'{'):
# If the request body starts with '{', encode it
try:
# Encode the content
encoded_data = self.encode_content(flow.request.content, xor_table)
# Replace the request content with encoded data
flow.request.content = encoded_data
ctx.log.info(f"Encoded request for flow {flow.id}")
except Exception as e:
ctx.log.error(f"Error encoding request for flow {flow.id}: {e}")
def response(self, flow: mitmproxy.http.HTTPFlow):
"""
Handle the HTTP response.
Decode the response body if applicable.
"""
session_id = self.get_session_id(flow)
if not session_id or not self.should_process_flow(flow):
return
# Build XOR table
xor_table = build_xor_table(self.common_key, session_id)
if not xor_table:
ctx.log.error("XOR table is empty. Skipping flow.")
return
# Store XOR table in flow metadata
flow.metadata['xor_table'] = xor_table
if flow.response.content:
try:
# Decode the response body
data_after_xor = conditional_xor(flow.response.content, xor_table, encode=False)
# Decode varint
varint, varint_length = decode_varint(data_after_xor)
# Extract compressed data
compressed_data = data_after_xor[varint_length:]
#print(data_after_xor)
# Decompress using LZ4
decompressed_data = lz4.block.decompress(compressed_data, uncompressed_size=varint)
# Replace the response content with decompressed data
flow.response.content = decompressed_data
flow.response.headers['Content-Type'] = 'application/json; charset=UTF-8'
# Mark that the response has been decoded
flow.metadata['decoded_response'] = {
"varint": varint,
"varint_length": varint_length,
"compressed_data": compressed_data,
"decompressed_data": decompressed_data
}
ctx.log.info(f"Decoded response for flow {flow.request.url} ({flow.id})")
except Exception as e:
ctx.log.warn(f"Error decoding response for flow {flow.request.url} ({flow.id}): {e}")
if flow.request.content:
# If the request body does not start with '{', decode it after response is done
try:
# Decode the request body
data_after_xor = conditional_xor(flow.request.content, xor_table, encode=False)
# Decode varint
varint, varint_length = decode_varint(data_after_xor)
# Extract compressed data
compressed_data = data_after_xor[varint_length:]
# Decompress using LZ4
decompressed_data = lz4.block.decompress(compressed_data, uncompressed_size=varint)
# Replace the request content with decompressed data
flow.request.content = decompressed_data
ctx.log.info(f"Decoded request for flow {flow.id} after response done")
except Exception as e:
ctx.log.error(f"Error decoding request for flow {flow.id} after response done: {e}")
def encode_content(self, content: bytes, xor_table: bytes):
"""
Encode the content using the provided XOR table.
Args:
content (bytes): The data to encode or decode.
xor_table (bytes): The XOR table.
encode (bool): Encoding or decoding.
Returns:
bytes: The encoded or decoded data.
"""
# Compress using LZ4
compressed_data = lz4.block.compress(content)
# Encode varint (size of decompressed data)
varint = encode_varint(len(content))
# Concatenate varint and compressed data
data_to_encode = varint + compressed_data[4:]
# Apply conditional XOR encoding
encoded_data = conditional_xor(data_to_encode, xor_table, encode=True)
return encoded_data
def encode_request(self, flow: mitmproxy.http.HTTPFlow):
"""
Encode the modified request content before sending it to the server.
"""
if 'decoded_request' in flow.metadata:
xor_table = flow.metadata['xor_table']
decoded_data = flow.request.content
try:
# Encode the content
encoded_data = self.encode_content(decoded_data, xor_table)
# Replace the request content with encoded data
flow.request.content = encoded_data
ctx.log.info(f"Encoded modified request for flow {flow.request.url} ({flow.id})")
except Exception as e:
ctx.log.error(f"Error encoding modified request for flow {flow.request.url} ({flow.id}): {e}")
def encode_response(self, flow: mitmproxy.http.HTTPFlow):
"""
Encode the modified response content before sending it to the client.
"""
if 'decoded_response' in flow.metadata:
xor_table = flow.metadata['xor_table']
decoded_data = flow.response.content
try:
# Encode the content
encoded_data = self.encode_content(decoded_data, xor_table)
# Replace the response content with encoded data
flow.response.content = encoded_data
ctx.log.info(f"Encoded modified response for flow {flow.request.url} ({flow.id})")
except Exception as e:
ctx.log.error(f"Error encoding modified response for flow {flow.request.url} ({flow.id})")
addons = [
DenaObfuscation()
]
import argparse
import sys
import lz4.block
# NOTE: miitomo common key is 9ec1c78fa2cb34e2bed5691c08432f04
# session id is player_session_id cookie
def transform_common_key(s):
"""
Transforms the common key by subtracting 0x62 from each character
and negating the result, similar to FUN_0004a120 in libcocos2dcpp.so.
Args:
s (str): Input common key string.
Returns:
bytes: Processed bytes.
"""
return bytes([(-0x62 - ord(c)) & 0xFF for c in s])
def build_xor_table(common_key, session_id):
"""
Build the XOR table by processing the common key and session ID
and concatenating the results.
Args:
common_key (str): The common key string.
session_id (str): The session ID string.
Returns:
bytes: The XOR table.
"""
transformed_common = transform_common_key(common_key)
return transformed_common + session_id.encode('ascii')
def conditional_xor(data, xor_table):
"""
Apply the conditional XOR operation to the data using the XOR table.
For each byte in the data:
- If (key_byte & 7) == 0, XOR the data byte with the key byte.
- Else, perform a bit rotation based on (key_byte & 7).
Args:
data (bytes): The obfuscated data.
xor_table (bytes): The XOR table.
Returns:
bytes: The data after applying the XOR operation.
"""
output = bytearray(len(data))
table_len = len(xor_table)
for i in range(len(data)):
key_byte = xor_table[(i + 1) % table_len]
if (key_byte & 7) == 0:
# Perform XOR
output[i] = data[i] ^ key_byte
else:
# Perform bit rotation
shift = key_byte & 7
rotated = ((data[i] >> (8 - shift)) | (data[i] << shift)) & 0xFF
output[i] = rotated
return bytes(output)
def decode_varint(data):
"""
Decode a varint from the beginning of the data.
Args:
data (bytes): The data containing the varint at the start.
Returns:
tuple: (decoded integer, number of bytes consumed)
Raises:
ValueError: If the varint is too long or incomplete.
"""
value = 0
shift = 0
for i, byte in enumerate(data):
value |= (byte & 0x7F) << shift
if (byte & 0x80) == 0:
return value, i + 1
shift += 7
if shift >= 35:
raise ValueError("Varint too long")
raise ValueError("Incomplete varint")
def main():
# Set up argument parsing
parser = argparse.ArgumentParser(description='Deobfuscate data using common key and session ID.')
parser.add_argument('--common-key', required=True, help='Common key string from setCommonKey.')
parser.add_argument('--session-id', required=True, help='Session ID string from updateSessionId.')
parser.add_argument('input_file', help='Path to the obfuscated data file.')
args = parser.parse_args()
# Build the XOR table
xor_table = build_xor_table(args.common_key, args.session_id)
if not xor_table:
print("Error: XOR table is empty. Check the common key and session ID inputs.")
sys.exit(1)
# Read the obfuscated data
try:
with open(args.input_file, 'rb') as f:
obfuscated_data = f.read()
except Exception as e:
print(f"Error reading input file: {e}")
sys.exit(1)
# Apply the conditional XOR operation
data_after_xor = conditional_xor(obfuscated_data, xor_table)
# Decode varint
try:
varint, varint_length = decode_varint(data_after_xor)
except ValueError as e:
print(f"Error decoding varint: {e}")
sys.exit(1)
print(f"=== Varint (Decompressed Size) ===")
print(varint)
print(f"Bytes consumed for varint: {varint_length}")
# Extract compressed data
compressed_data = data_after_xor[varint_length:]
# Decompress using LZ4
try:
decompressed_data = lz4.block.decompress(compressed_data, uncompressed_size=varint)
except lz4.block.LZ4BlockError as e:
print(f"Error during LZ4 decompression: {e}")
# Optionally, print the compressed data for inspection
print("\nCompressed Data (Hex):")
print(compressed_data.hex())
sys.exit(1)
# Print the decompressed data
print("\n=== Decompressed Data ===")
# Attempt to decode as UTF-8; if fails, print as hex
try:
print(decompressed_data.decode('utf-8'))
except UnicodeDecodeError:
print(decompressed_data.hex())
if __name__ == "__main__":
main()
package main
import (
"bytes"
"crypto/tls"
"flag"
"io"
"log"
"net/http"
"net/http/httputil"
"net/http/httptest"
"net/url"
"strings"
//"strconv"
// for access logs
"fmt"
"net"
"os"
"time"
"github.com/pierrec/lz4"
)
const commonKey = "9ec1c78fa2cb34e2bed5691c08432f04"
// transformCommonKey transforms the common key by subtracting 0x62 from each character
// and negating the result, similar to FUN_0004a120 in libsakasho.so.
func transformCommonKey(s string) []byte {
transformed := make([]byte, len(s))
for i, c := range s {
transformed[i] = byte((-0x62 - int(c)) & 0xFF)
}
return transformed
}
// buildXorTable builds the XOR table by processing the common key and session ID
// and concatenating the results.
func buildXorTable(commonKey string, sessionID string) []byte {
transformedCommon := transformCommonKey(commonKey)
return append(transformedCommon, []byte(sessionID)...)
}
// conditionalXOR applies the conditional XOR operation to the data using the XOR table.
// For each byte in the data:
// - If (key_byte & 7) == 0, XOR the data byte with the key byte.
// - Else, perform a bit rotation based on (key_byte & 7).
func conditionalXOR(data []byte, xorTable []byte, encode bool) []byte {
output := make([]byte, len(data))
tableLen := len(xorTable)
for i := 0; i < len(data); i++ {
keyByte := xorTable[(i+1)%tableLen]
if keyByte&7 == 0 {
// Perform XOR
output[i] = data[i] ^ keyByte
} else {
// Perform bit rotation
shift := keyByte & 7
if encode {
rotated := ((data[i] << (8 - shift)) | (data[i] >> shift)) & 0xFF
output[i] = rotated
} else {
rotated := ((data[i] >> (8 - shift)) | (data[i] << shift)) & 0xFF
output[i] = rotated
}
}
}
return output
}
// decodeVarint decodes a varint from the beginning of the data.
func decodeVarint(data []byte) (int, int) {
var value int
var shift int
for i, byte := range data {
value |= int(byte&0x7F) << shift
if byte&0x80 == 0 {
return value, i + 1
}
shift += 7
if shift >= 35 {
panic("Varint too long")
}
}
panic("Incomplete varint")
}
// encodeVarint encodes an integer into varint format.
func encodeVarint(value int) []byte {
var parts []byte
for {
b := byte(value & 0x7F)
value >>= 7
if value != 0 {
parts = append(parts, b|0x80)
} else {
parts = append(parts, b)
break
}
}
return parts
}
// decodeAndDecompress takes the request body, decodes it using XOR,
// extracts the compressed data, decompresses it using LZ4, and returns the result.
func decodeAndDecompress(body io.Reader, xorTable []byte) ([]byte, error) {
// Stream read
buf := new(bytes.Buffer)
_, err := io.Copy(buf, body)
if err != nil {
return nil, err
}
data := buf.Bytes()
dataAfterXOR := conditionalXOR(data, xorTable, false)
// Decode varint
varint, varintLength := decodeVarint(dataAfterXOR)
compressedData := dataAfterXOR[varintLength:]
// Decompress the data using LZ4
decompressed := make([]byte, varint)
decompressedLen, err := lz4.UncompressBlock(compressedData, decompressed)
if err != nil {
return nil, err
}
return decompressed[:decompressedLen], nil
}
// compressAndEncode compresses the given data using LZ4,
// encodes it using XOR, and returns the result.
func compressAndEncode(body []byte, xorTable []byte) ([]byte, error) {
// Compress using LZ4
compressed := make([]byte, lz4.CompressBlockBound(len(body)))
compressedLen, err := lz4.CompressBlock(body, compressed, nil)
if err != nil {
return nil, err
}
compressed = compressed[:compressedLen]
// Encode varint (size of decompressed data)
varint := encodeVarint(len(body))
// Concatenate varint and compressed data
dataToEncode := append(varint, compressed...)
// Apply conditional XOR encoding
encodedData := conditionalXOR(dataToEncode, xorTable, true)
return encodedData, nil
}
// proxyHandler handles incoming requests, decodes them, forwards to upstream, and re-encodes responses.
func proxyHandler(upstreamURL *url.URL, certFile, keyFile string) func(w http.ResponseWriter, r *http.Request) {
upstreamProxy := httputil.NewSingleHostReverseProxy(upstreamURL)
upstreamProxy.Transport = &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
Proxy: http.ProxyFromEnvironment, // Allow proxy config from env
}
// Override the Director function to ensure we pass the original URL's host header
upstreamProxy.Director = func(req *http.Request) {
// Set the scheme and host to the upstream server's
req.URL.Scheme = upstreamURL.Scheme
req.URL.Host = upstreamURL.Host
req.Host = upstreamURL.Host
// Copy over the original path and query
req.URL.Path = req.URL.Path
req.URL.RawQuery = req.URL.RawQuery
// Set the Host header explicitly
req.Header.Set("Host", upstreamURL.Host)
//req.Header.Del("Content-Length")
}
return func(w http.ResponseWriter, r *http.Request) {
sessionIDCookie, _ := r.Cookie("player_session_id")
userAgent := r.Header.Get("User-Agent")
// Bypass de/obfuscation logic if sessionID is missing, User-Agent contains "SakashoClient", or it's /v1/session
if sessionIDCookie == nil || !strings.Contains(userAgent, "SakashoClient") || r.URL.Path == "/v1/session" {
// Directly proxy without de/obfuscation
upstreamProxy.ServeHTTP(w, r)
return
}
sessionID := sessionIDCookie.Value
xorTable := buildXorTable(commonKey, sessionID)
// Properly handle empty request bodies
if r.Body != nil {
buf := new(bytes.Buffer)
_, err := io.Copy(buf, r.Body)
if err != nil && err != io.EOF {
http.Error(w, "Failed to read request body", http.StatusInternalServerError)
return
}
// Only decode if the body is not empty
if buf.Len() > 0 {
decodedBody, err := decodeAndDecompress(buf, xorTable)
if err != nil {
http.Error(w, "Failed to decode request", http.StatusInternalServerError)
return
}
//r.Header.Set("Content-Length", strconv.Itoa(len(decodedBody)))
// Replace body with decoded data
r.Body = io.NopCloser(bytes.NewReader(decodedBody))
// Remove Content-Length header since the body size has changed
r.Header.Del("Content-Length")
r.ContentLength = -1
}
}
// Modify headers for upstream server if necessary
r.Host = upstreamURL.Host
r.URL.Scheme = upstreamURL.Scheme
r.URL.Host = upstreamURL.Host
// Set the Host header to match the upstream server
r.Header.Set("Host", upstreamURL.Host)
// Capture the upstream response
rec := httptest.NewRecorder()
upstreamProxy.ServeHTTP(rec, r)
// Pass through upstream headers to client
for k, v := range rec.Header() {
w.Header()[k] = v
}
// Re-encode the response body
if rec.Body != nil && rec.Body.Len() > 0 {
encodedBody, err := compressAndEncode(rec.Body.Bytes(), xorTable)
if err != nil {
http.Error(w, "Failed to encode response", http.StatusInternalServerError)
return
}
// Copy encoded response to original response writer
w.WriteHeader(rec.Code)
/*for k, v := range rec.Header() {
w.Header()[k] = v
}*/
w.Write(encodedBody)
}
}
}
func main() {
// Argument parsing for certificate, upstream, and hostname to client
var upstreamAddr string
var hostname string
var certFile string
var keyFile string
flag.StringVar(&upstreamAddr, "upstream", "https://upstream.server", "Upstream server URL")
flag.StringVar(&hostname, "hostname", "localhost:8080", "Hostname and port for the client to connect to")
flag.StringVar(&certFile, "cert", "cert.pem", "TLS certificate file")
flag.StringVar(&keyFile, "key", "key.pem", "TLS key file")
flag.Parse()
upstreamURL, err := url.Parse(upstreamAddr)
if err != nil {
log.Fatalf("Invalid upstream URL: %v", err)
}
http.Handle("/", logRequest(http.HandlerFunc(proxyHandler(upstreamURL, certFile, keyFile))))
log.Printf("Starting proxy server on %s", hostname)
log.Fatal(http.ListenAndServeTLS(hostname, certFile, keyFile, nil))
}
// fancy access logs
const (
// ANSI color codes for access logs
ANSIReset = "\033[0m"
ANSIRed = "\033[31m"
ANSIGreen = "\033[32m"
ANSIYellow = "\033[33m"
ANSIPurple = "\033[35m"
ANSIFaint = "\033[2m"
ANSIBold = "\033[1m"
ANSICyan = "\033[36m"
ANSIBgRed = "\033[101m"
ANSIBgBlue = "\033[104m"
ANSIBgMagenta = "\033[105m"
)
func isColorTerminal() bool {
// NOTE: hack
return os.Getenv("TERM") == "xterm-256color"
}
// getClientIP retrieves the client IP address from the request,
// considering the X-Forwarded-For header if present.
func getClientIP(r *http.Request) string {
host, _, _ := net.SplitHostPort(r.RemoteAddr)
return host
}
// responseWriter is a custom http.ResponseWriter that captures the status code
type responseWriter struct {
http.ResponseWriter
statusCode int
}
// newResponseWriter creates a new responseWriter
func newResponseWriter(w http.ResponseWriter) *responseWriter {
return &responseWriter{w, http.StatusOK}
}
// WriteHeader captures the status code
func (rw *responseWriter) WriteHeader(code int) {
rw.statusCode = code
rw.ResponseWriter.WriteHeader(code)
}
// logRequest logs each request in Apache/Nginx standard format with ANSI colors
func logRequest(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
rw := newResponseWriter(w)
handler.ServeHTTP(rw, r)
status := rw.statusCode
latency := time.Since(start)
clientIP := getClientIP(r)
if isColorTerminal() {
statusColor := ANSIGreen
// Determine the status color
if status >= 400 && status < 500 {
statusColor = ANSIYellow
} else if status >= 500 {
statusColor = ANSIRed
}
latencyColor := getLatencyGradientColor(latency)
clientIPColor := ANSICyan
if r.Header.Get("X-Forwarded-For") != "" {
clientIPColor = ANSIBgMagenta
}
var query string
if r.URL.RawQuery != "" {
query += "?"
}
query += r.URL.RawQuery
queryColored := colorQueryParameters(query)
// so many colors.....
fmt.Println(clientIPColor + clientIP + ANSIReset +
" - - [" + start.Format("02/Jan/2006:15:04:05 -0700") + "] \"" +
ANSIGreen + r.Method + " " + r.URL.Path + queryColored + " " + ANSIReset +
ANSIFaint + r.Proto + ANSIReset + "\" " +
statusColor + fmt.Sprint(status) + ANSIReset + " " +
fmt.Sprint(r.ContentLength) + " \"" +
ANSIPurple + r.Referer() + ANSIReset + "\" \"" +
ANSIFaint + r.UserAgent() + ANSIReset + "\" " +
latencyColor + fmt.Sprint(latency) + ANSIReset)
} else {
// apache/nginx request format with latency at the end
fmt.Println(clientIP + " - - [" + start.Format("02/Jan/2006:15:04:05 -0700") + "] \"" +
r.Method + " " + r.RequestURI + " " + r.Proto + "\" " +
fmt.Sprint(status) + " " + fmt.Sprint(r.ContentLength) + " \"" +
r.Referer() + "\" \"" + r.UserAgent() + "\" " +
fmt.Sprint(latency))
}
})
}
// Color ranges for latency gradient
var latencyColors = []string{
"\033[38;5;39m", // Blue
"\033[38;5;51m", // Light blue
"\033[38;5;27m", // Added color (Dark blue)
"\033[38;5;82m", // Green
"\033[38;5;34m", // Added color (Forest green)
"\033[38;5;154m", // Light green
"\033[38;5;220m", // Yellow
"\033[38;5;208m", // Orange
"\033[38;5;198m", // Light red
}
// getLatencyGradientColor returns a gradient color based on the latency
func getLatencyGradientColor(latency time.Duration) string {
millis := latency.Milliseconds()
// Define latency thresholds
thresholds := []int64{40, 60, 85, 100, 150, 230, 400, 600}
for i, threshold := range thresholds {
if millis < threshold {
return latencyColors[i]
}
}
return latencyColors[len(latencyColors)-1]
}
// colorQueryParameters colors the query parameters
func colorQueryParameters(query string) string {
if query == "" {
return ""
}
// NOTE: the question mark and first query key are colored the same
params := strings.Split(query, "&")
var coloredParams []string
for _, param := range params {
keyValue := strings.Split(param, "=")
if len(keyValue) == 2 {
coloredParams = append(coloredParams, fmt.Sprintf("%s%s%s=%s%s%s", ANSICyan, keyValue[0], ANSIReset, ANSIYellow, keyValue[1], ANSIReset))
} else {
coloredParams = append(coloredParams, param)
}
}
return strings.Join(coloredParams, "&")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment