In this guide we will be creating a Python class that can run a function on screenshots from a Twitch stream in real time.
One application I explore is computer vision for the purpose of detecting certain features on screen, such as a “You Win!” screen.
Software Requirements
ffmpeg
A program called ffmpeg is required to process raw video stream data. This means we need to install ffmpeg separately and make sure it is added to our path.
WikiHow to install ffmpeg for Windows.
Python 3
This guide uses Python 3.
PyPi Requirements
The following PyPi projects are used in this guide. You can install them using pip install streamlink ffmpeg-python pillow opencv-python numpy
.
streamlink
ffmpeg-python
pillow
opencv-python
numpy
The Big Idea
In order to run a function on screenshots from a Twitch stream, we first need to get those screenshots.
Code
Now we get into the code, feel free to copy-paste any parts you want. The full code without commentary is available here!
We begin by importing our dependencies.
import streamlink
import ffmpeg
from PIL import Image
import io
import cv2 as cv
import numpy as np
import time
Then first we need to get the stream’s HLS URL.
This will be unreadable to us, but ffmpeg needs it.
Change the 480p
to whatever resolution you need for your application. You can also use best
or worst
.
def get_stream_url(user):
strim = streamlink.streams("https://twitch.tv/%s" % user)
url = strim["480p"].url
return url
This function is the meat and potatoes of our work.
def get_live_feed(strim):
probed = ffmpeg.probe(strim)
height = probed["streams"][1]["height"]
width = probed["streams"][1]["width"]
#print("Height: %s, Width: %s" % (height, width))
in_feed = (
ffmpeg
.input(strim)
.filter("fps", fps=1)
.output('pipe:', format='rawvideo', pix_fmt='rgb24')
.run_async(pipe_stdout=True)
)
count = 0
stall = 0
while True:
in_bytes = in_feed.stdout.read(width * height * 3)
if not in_bytes:
stall += 1
if stall < 30: # stall check
time.sleep(1)
continue
else:
print("No stream data for 30s, closing stream..")
break
stall = 0
in_frame = (
np
.frombuffer(in_bytes, np.uint8)
.reshape([height, width, 3])
)
img = Image.fromarray(in_frame[:, :, :].copy())
#img.save("feed_test2/live%s.jpg" % count)
imgarray = np.asarray(img)[:, :, ::-1].copy()
count += 1
# run test on image!
base = ImageDescriptor(title="liveimage%s" % count, array=imgarray)
out = False
for feature in features:
try:
out = check_frame(feature, base) if not out else out
except:
pass
# test_outputs.append(imgarray)
# print(len(test_outputs))
# if len(test_outputs) > 20:
# return test_outputs
def check_frame(feature_img, base_img, threshold=50):
img1 = feature_img.img_.copy()
img2 = base_img.img_.copy()
MIN_MATCH_COUNT = threshold
# Initiate SIFT detector
# sift = cv.SIFT_create()
# find the keypoints and descriptors with SIFT
kp1, des1 = feature_img.kp(), feature_img.des()
kp2, des2 = base_img.kp(), base_img.des()
FLANN_INDEX_KDTREE = 1
index_params = dict(algorithm=FLANN_INDEX_KDTREE, trees=5)
search_params = dict(checks=50)
flann = cv.FlannBasedMatcher(index_params, search_params)
matches = flann.knnMatch(des1, des2, k=2)
# store all the good matches as per Lowe's ratio test.
good = []
for m, n in matches:
if m.distance < 0.7 * n.distance:
good.append(m)
print("comparison %s on %s had %s good matches" % (feature_img.title, base_img.title, len(good)))
if len(good) > MIN_MATCH_COUNT:
src_pts = np.float32([kp1[m.queryIdx].pt for m in good]).reshape(-1, 1, 2)
dst_pts = np.float32([kp2[m.trainIdx].pt for m in good]).reshape(-1, 1, 2)
M, mask = cv.findHomography(src_pts, dst_pts, cv.RANSAC, 5.0)
matchesMask = mask.ravel().tolist()
h, w, d = img1.shape
pts = np.float32([[0, 0], [0, h - 1], [w - 1, h - 1], [w - 1, 0]]).reshape(-1, 1, 2)
dst = cv.perspectiveTransform(pts, M)
img2 = cv.polylines(img2, [np.int32(dst)], True, 255, 3, cv.LINE_AA)
else:
print("Not enough matches are found - {}/{}".format(len(good), MIN_MATCH_COUNT))
matchesMask = None
if len(good) > MIN_MATCH_COUNT:
draw_params = dict(matchColor=(0, 255, 0), # draw matches in green color
singlePointColor=None,
matchesMask=matchesMask, # draw only inliers
flags=2)
img3 = cv.drawMatches(img1, kp1, img2, kp2, good, None, **draw_params)
img3_out = Image.fromarray(img3[:, :, ::-1].copy())
img3_out.save("outputs/%s=%s.jpg" % ( base_img.title, feature_img.title))
return len(good) > MIN_MATCH_COUNT
class ImageDescriptor:
def __init__(self, filename=None, title=None, array=None):
if filename is not None:
self.title = filename.split("/")[-1].strip(".jpg")
self.img_ = cv.imread(filename)
else:
self.title = title
self.img_ = array
self.kp_ = None
self.des_ = None
def kp(self):
if self.kp_ is None:
sift = cv.SIFT_create()
self.kp_, self.des_ = sift.detectAndCompute(self.img_, None)
return self.kp_
def des(self):
if self.des_ is None:
sift = cv.SIFT_create()
self.kp_, self.des_ = sift.detectAndCompute(self.img_, None)
return self.des_
# commercial_reference = cv.imread('commercial.jpg')
# full_commercial_reference = cv.imread('what.jpeg')
# c_reference = cv.imread('C.jpg')
# twitch_reference = cv.imread("twitch.jpg")
# print("took %ss" % (time.time() - start))
# start = time.time()
# check_frame(twitch_reference, simg)
# print("took %ss" % (time.time() - start))
features = [
ImageDescriptor("features/amongus_defeat.jpg"),
ImageDescriptor("features/amongus_victory.jpg"),
ImageDescriptor("features/amongus_lobby.jpg"),
ImageDescriptor("features/amongus_lobby1.jpg"),
ImageDescriptor("features/amongus_lobby3.jpg"),
ImageDescriptor("features/amongus_lobby4.jpg")
]
ex_bases = [
(ImageDescriptor('bases/hafu_example_win.jpg'), True),
(ImageDescriptor('bases/hafu_example_loss.jpg'), True),
(ImageDescriptor('bases/hafu_example_lobby.jpg'), True),
(ImageDescriptor('bases/hafu_example_lobby2.jpg'), True),
(ImageDescriptor('bases/hafu_example_map.jpg'), False),
(ImageDescriptor('bases/hafu_example_tablet.jpg'), False),
(ImageDescriptor('bases/hafu_example_vote.jpg'), False),
(ImageDescriptor('bases/hafu_example_menu.jpg'), False),
]
sstrim = get_stream_url("itshafu")
get_live_feed(sstrim)
# for base in ex_bases:
# print("------------------------")
# out = False
# for feature in features:
# print("comparing %s on %s" % (feature.title, base[0].title))
# start = time.time()
# out = check_frame(feature, base[0]) if not out else out
# print("took %ss" % (time.time() - start))
# if out: # quit early on positive match (but not during testing)
# pass # break
# if out is not base[1]:
# print("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA")
# else:
# print("base image %s was correctly classified" % base[0].title)