Last active
January 9, 2024 02:15
-
-
Save denvaar/66721b7a2f54f90592a509d29f57f831 to your computer and use it in GitHub Desktop.
Dependency free presigned S3 links
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule S3Downloads do | |
@moduledoc """ | |
Use at your own risk. | |
Code heavily borrowed from: | |
- https://github.com/ex-aws/ex_aws/blob/main/lib/ex_aws/auth.ex | |
- https://gist.github.com/chrismccord/37862f1f8b1f5148644b75d20d1cb073 | |
""" | |
@doc """ | |
Generate a "presigned" url for an object in S3. | |
Follows "AWS Signature Version 4" for Query Parameters. | |
https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-query-string-auth.html | |
eg. | |
S3Downloads.get_presigned_url( | |
%{region: "us-west-1", bucket: "example-bucket", access_key_id: "shhhh", secret_access_key: "sshhhh"}, | |
"object/key/here.txt", | |
1, | |
[{"response-content-disposition", ~s(attachment; filename="file.pdf")}] | |
) | |
""" | |
def get_presigned_url(config, object_key, expires_in_minutes, extra_headers \\ []) do | |
with %{ | |
bucket: bucket, | |
region: region, | |
access_key_id: access_key_id, | |
secret_access_key: secret_access_key | |
} <- config do | |
now = DateTime.utc_now() | |
expires_in_seconds = expires_in_minutes * 60 | |
canonical_request = | |
build_canonical_request( | |
access_key_id, | |
bucket, | |
region, | |
object_key, | |
now, | |
expires_in_seconds, | |
extra_headers | |
) | |
string_to_sign = build_string_to_sign(now, region, canonical_request) | |
signature = build_signature(secret_access_key, now, region, string_to_sign) | |
query_params = | |
query_params(access_key_id, region, now, "#{expires_in_seconds}", extra_headers) | |
"https://#{bucket}.s3.amazonaws.com/#{encode_key(object_key)}?#{query_params}&X-Amz-Signature=#{signature}" | |
end | |
end | |
def build_canonical_request( | |
access_key_id, | |
bucket, | |
region, | |
object_key, | |
now, | |
expires_in, | |
extra_headers | |
) do | |
http_verb = "GET" | |
canonical_uri = "/" <> encode_key(object_key) | |
expires_in = "#{expires_in}" | |
canonical_query_string = query_params(access_key_id, region, now, expires_in, extra_headers) | |
canonical_headers = "host:#{bucket}.s3.amazonaws.com" | |
signed_headers = "host" | |
[ | |
http_verb, | |
"\n", | |
canonical_uri, | |
"\n", | |
canonical_query_string, | |
"\n", | |
canonical_headers, | |
"\n", | |
"\n", | |
signed_headers, | |
"\n", | |
"UNSIGNED-PAYLOAD" | |
] | |
|> IO.iodata_to_binary() | |
end | |
def build_string_to_sign(date, region, canonical_request) do | |
[ | |
"AWS4-HMAC-SHA256", | |
"\n", | |
amz_date(date), | |
"\n", | |
scope(date, region), | |
"\n", | |
Base.encode16(sha256_hash(canonical_request), case: :lower) | |
] | |
|> IO.iodata_to_binary() | |
end | |
def build_signature(secret_access_key, date, region, string_to_sign) do | |
date_key = sha256_hmac("AWS4" <> secret_access_key, short_date(date)) | |
date_region_key = sha256_hmac(date_key, region) | |
date_region_service_key = sha256_hmac(date_region_key, "s3") | |
signing_key = sha256_hmac(date_region_service_key, "aws4_request") | |
Base.encode16(sha256_hmac(signing_key, string_to_sign), case: :lower) | |
end | |
def query_params(access_key_id, region, date, expires_in, extra_headers) do | |
[ | |
{"X-Amz-Algorithm", "AWS4-HMAC-SHA256"}, | |
{"X-Amz-Credential", amz_credential(access_key_id, region, date)}, | |
{"X-Amz-Date", amz_date(date)}, | |
{"X-Amz-Expires", expires_in}, | |
{"X-Amz-SignedHeaders", "host"} | |
] | |
|> Kernel.++(extra_headers) | |
|> Enum.map(&qs_encode/1) | |
|> Enum.sort_by(&qs_alphabetical/1) | |
|> Enum.map_join("&", &qs_joiner/1) | |
end | |
def amz_credential(access_key_id, region, date) do | |
"#{access_key_id}/#{scope(date, region)}" | |
end | |
def scope(date, region) do | |
"#{short_date(date)}/#{region}/s3/aws4_request" | |
end | |
def qs_alphabetical({query_parameter, _value}) do | |
query_parameter | |
end | |
def qs_encode({query_parameter, value}) do | |
{uri_encode(query_parameter), uri_encode(value)} | |
end | |
def qs_joiner({query_parameter, value}) do | |
"#{query_parameter}=#{value}" | |
end | |
# ========= | |
# utils | |
# ========= | |
def sha256_hmac(secret, msg), do: :crypto.mac(:hmac, :sha256, secret, msg) | |
def sha256_hash(data), do: :crypto.hash(:sha256, data) | |
def encode_key(s) do | |
# Amazon made their own fun little rules. | |
URI.encode(s, fn c -> | |
should_encode = [ | |
?\s, | |
?(, | |
?) | |
] | |
should_not_encode = [?/] | |
cond do | |
c in should_encode -> false | |
c in should_not_encode -> true | |
true -> URI.char_unescaped?(c) && !URI.char_reserved?(c) | |
end | |
end) | |
end | |
def uri_encode(s) do | |
URI.encode(s, &valid_path_char?/1) | |
end | |
def valid_path_char?(?\s), do: false | |
def valid_path_char?(?/), do: false | |
def valid_path_char?(c) do | |
URI.char_unescaped?(c) && !URI.char_reserved?(c) | |
end | |
def amz_date(time) do | |
time | |
|> NaiveDateTime.to_iso8601() | |
|> String.split(".") | |
|> List.first() | |
|> String.replace("-", "") | |
|> String.replace(":", "") | |
|> Kernel.<>("Z") | |
end | |
def short_date(d) do | |
d | |
|> amz_date() | |
|> String.slice(0..7) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment