Skip to content

Instantly share code, notes, and snippets.

@justfoolingaround
Created August 16, 2024 15:43
Show Gist options
  • Save justfoolingaround/fbb21c7e9cee4acadb75781551f7305a to your computer and use it in GitHub Desktop.
Save justfoolingaround/fbb21c7e9cee4acadb75781551f7305a to your computer and use it in GitHub Desktop.
A Twitch streaming client that tries to stay as real-time as possible.
"""
In short, the goal of this project is to
stay ahead of your friends during a live
watch party to avoid any spoiling reactions
when the stakes are real high.
This means that you'll be the one spoiling
your friends though, so keep up the act!
---
HANGING PRINT STATEMENTS ARE INTENDED. THEY
ARE TO BE REMOVED WHEN I DISCOVER A FIX FOR
THOSE.
I NEVER ENCOUNTERED ADS OR A DISCONTINUOUS
STREAM, IT'S NOT THAT I'M LAZY.
---
Piper.py by @justfoolingaround (devkr)
Requirements: python-requests, mpv in PATH
A Twitch streaming client that tries to stay
as real-time as possible.
Even Twitch's fastest client has a live edge
of 2, `piper` tries to keep it at about 1.
A to-do may be to potentially add certain
level of dynamic edge management so that we
can handle unexpectancies easily.
Handling `low_latency` case could potentially
be done differently in order to enforce any
live edges but this may sacrifice the future
dynamicity of `piper`'s live edge.
"""
import logging
import subprocess
import time
import urllib.parse
import requests
TAG_NAME_MAPPING = {
"#EXT-X-STREAM-INF": "stream_information",
"#EXT-X-MEDIA": "media",
"#EXT-X-TARGETDURATION": "target_duration",
"#EXT-X-VERSION": "version",
"#EXT-X-MEDIA-SEQUENCE": "media_sequence",
"#EXTINF": "information",
"#EXT-X-TWITCH-PREFETCH": "low_latency",
}
def parse_stream_attrs(expression: str):
attrs = {}
args = []
if ":" not in expression:
return (expression, attrs, args)
tag, attributes = expression.split(":", 1)
i = 0
n = len(attributes)
while i < n:
key = ""
value = ""
while i < n and attributes[i] != "=":
key += attributes[i]
i += 1
i += 1
if i < n and attributes[i] == '"':
i += 1
while i < n and attributes[i] != '"':
value += attributes[i]
i += 1
i += 1
else:
while i < n and attributes[i] != ",":
value += attributes[i]
i += 1
if not value:
args.append(key)
else:
attrs.update({key.lower(): value})
i += 1
if tag in TAG_NAME_MAPPING:
tag = TAG_NAME_MAPPING[tag]
return (tag, attrs, args)
class Twitch:
CLIENT_ID = "kimne78kx3ncx6brgo4mv6wki5h1ko"
GQL_URL = "https://gql.twitch.tv/gql"
def __init__(self, session: requests.Session):
self.session = session
self.logger = logging.getLogger("twitch")
def iter_channels(self, query: str, *, must_be_live=True, must_not_be_dj=True):
response = self.session.post(
Twitch.GQL_URL,
headers={
"Client-Id": self.CLIENT_ID,
},
json=[
{
"operationName": "SearchTray_SearchSuggestions",
"variables": {
"queryFragment": query,
"withOfflineChannelContent": not must_be_live,
"includeIsDJ": not must_not_be_dj,
},
"extensions": {
"persistedQuery": {
"version": 1,
"sha256Hash": "2749d8bc89a2ddd37518e23742a4287becd3064c40465d8b57317cabd0efe096",
}
},
}
],
)
for edge in response.json()[0]["data"]["searchSuggestions"]["edges"]:
if edge is None or edge["node"]["content"] is None:
continue
yield edge["node"]["content"]["login"]
def iter_stream_urls(self, username: str):
response = self.session.post(
Twitch.GQL_URL,
headers={
"Client-Id": self.CLIENT_ID,
},
json={
"operationName": "PlaybackAccessToken_Template",
"query": 'query PlaybackAccessToken_Template($channel: String!) { streamPlaybackAccessToken(channelName: $channel, params: {platform: "ios", playerType: "site" }) { value signature authorization { isForbidden forbiddenReasonCode } __typename }}',
"variables": {
"channel": username,
},
},
)
access_token = response.json()["data"]["streamPlaybackAccessToken"]
token = access_token["value"]
signature = access_token["signature"]
hls_playlist_url = urllib.parse.urlunparse(
(
"https",
"usher.ttvnw.net",
f"/api/channel/hls/{username.lower()}.m3u8",
"",
urllib.parse.urlencode(
{
"allow_source": "true",
"sig": signature,
"token": token,
"fast_bread": "true",
}
),
"",
)
)
self.logger.debug("%r: %s", username, hls_playlist_url)
stream_attrs = None
for media in self.session.get(hls_playlist_url).text.split("\n"):
if media[:12] == "#EXT-X-MEDIA":
stream_attrs = {}
if stream_attrs is None:
continue
if media[:1] == "#":
type_of, attrs, _ = parse_stream_attrs(media)
stream_attrs.update({type_of: attrs})
else:
stream_attrs.update({"url": media})
yield stream_attrs
stream_attrs = None
class TwitchStreamIterator:
def __init__(self, session: requests.Session, stream: str):
self.session = session
self.stream = stream
self.current_sequence = None
def iter_from_sequence(self, our_sequence=None):
try:
response = self.session.get(
self.stream,
headers={
"Origin": "https://www.twitch.tv",
"Referer": "https://www.twitch.tv/",
},
)
response.raise_for_status()
except (requests.exceptions.HTTPError, requests.exceptions.ConnectionError):
return
next_has_stream = False
for media in response.text.split("\n"):
if media[:1] == "#":
tag, _, args = parse_stream_attrs(media)
if tag == "media_sequence":
self.current_sequence = int(args[0])
if our_sequence is None:
our_sequence = self.current_sequence
if tag == "information":
if args[0][-4:] == "live":
next_has_stream = True
else:
print("Encountering ads, trying to skip.")
if tag == "low_latency":
yield (our_sequence, args[0])
our_sequence += 1
self.current_sequence += 1
else:
if next_has_stream:
if self.current_sequence >= our_sequence:
our_sequence = max(our_sequence, self.current_sequence)
yield (our_sequence, media)
our_sequence += 1
self.current_sequence += 1
next_has_stream = False
def iter_indefinitely(self, last: int = None, *, max_poll: int = None):
delta = time.perf_counter()
sequence = list(self.iter_from_sequence())
if last is not None:
sequence = sequence[-last:]
n = 0
for n, stream in sequence:
yield (n, stream)
our_sequence = n + 1
while True:
if max_poll is not None:
sleep_for = max(0, max_poll - (time.perf_counter() - delta))
time.sleep(sleep_for)
delta = time.perf_counter()
sequence = list(self.iter_from_sequence(our_sequence))
if sequence:
last_n = sequence[-1][0]
diff = last_n - our_sequence
if last is not None and diff > last:
print(
f"Exceeded delta ({diff}>{last}), seeking near {last} with respect to the end."
)
sequence = sequence[-last:]
if our_sequence > last_n + 1 + len(sequence):
print(
f"Discontinuity detected (if persistent, hike the live edge to {len(sequence)=}):",
our_sequence,
last_n + 1,
)
our_sequence = last_n + 1
for n, stream in sequence:
yield (n, stream)
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print(f"Usage: {sys.argv[0]} [LIVE_USER]", file=sys.__stderr__)
exit(1)
user = sys.argv[1]
session = requests.Session()
twitch = Twitch(session)
options = list(twitch.iter_channels(user))
if not options:
print(
"Either that user does not exist or is not live at the moment.",
file=sys.__stderr__,
)
exit(1)
stream_user = options[0]
streams = list(twitch.iter_stream_urls(stream_user))
if not streams:
print(
"No streams found. The code may be broken.",
file=sys.__stderr__,
)
exit(1)
best = streams[0]
print(f"Playing a {best['media']['name']} stream.")
mpv = subprocess.Popen(
[
"mpv",
"-",
"--cache=no",
"--profile=low-latency",
"--untimed",
f"--force-media-title=You are watching: {stream_user}",
],
stdin=subprocess.PIPE,
)
streamer = TwitchStreamIterator(
session,
best["url"],
)
chunk_size = 8192
for n, stream in streamer.iter_indefinitely(last=1):
playing = True
while playing:
try:
response = session.get(
stream,
stream=True,
)
response.raise_for_status()
for chunk in response.iter_content(chunk_size):
mpv.stdin.write(chunk)
mpv.stdin.flush()
playing = False
except requests.exceptions.ConnectionError:
print("Twitch is being naughty, retrying for stream:", n)
except BrokenPipeError:
print("Cannot stream any data anymore :(.")
exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment