In this guide we will be creating a Python class that can run a function on screenshots from a Twitch stream in real time.
One application I explore is computer vision for the purpose of detecting certain features on screen, such as a “You Win!” screen.

Software Requirements

ffmpeg

A program called ffmpeg is required to process raw video stream data. This means we need to install ffmpeg separately and make sure it is added to our path.
WikiHow to install ffmpeg for Windows.

Python 3

This guide uses Python 3.

PyPi Requirements

The following PyPi projects are used in this guide. You can install them using pip install streamlink ffmpeg-python pillow opencv-python numpy.

streamlink
ffmpeg-python
pillow
opencv-python
numpy

The Big Idea

In order to run a function on screenshots from a Twitch stream, we first need to get those screenshots.

Code

Now we get into the code, feel free to copy-paste any parts you want. The full code without commentary is available here!
We begin by importing our dependencies.

import streamlink
import ffmpeg
from PIL import Image
import io
import cv2 as cv
import numpy as np
import time

Then first we need to get the stream’s HLS URL.
This will be unreadable to us, but ffmpeg needs it.
Change the 480p to whatever resolution you need for your application. You can also use best or worst.

def get_stream_url(user):
    strim = streamlink.streams("https://twitch.tv/%s" % user)
    url = strim["480p"].url
    return url

This function is the meat and potatoes of our work.

def get_live_feed(strim):
    probed = ffmpeg.probe(strim)
    height = probed["streams"][1]["height"]
    width = probed["streams"][1]["width"]
    #print("Height: %s, Width: %s" % (height, width))
    in_feed = (
        ffmpeg
            .input(strim)
            .filter("fps", fps=1)
            .output('pipe:', format='rawvideo', pix_fmt='rgb24')
            .run_async(pipe_stdout=True)
    )
    count = 0
    stall = 0
    while True:
        in_bytes = in_feed.stdout.read(width * height * 3)
        if not in_bytes:
            stall += 1
            if stall < 30: # stall check
                time.sleep(1)
                continue
            else:
                print("No stream data for 30s, closing stream..")
                break
        stall = 0
        in_frame = (
            np
                .frombuffer(in_bytes, np.uint8)
                .reshape([height, width, 3])
        )
        img = Image.fromarray(in_frame[:, :, :].copy())
        #img.save("feed_test2/live%s.jpg" % count)
        imgarray = np.asarray(img)[:, :, ::-1].copy()
        count += 1
        # run test on image!
        base = ImageDescriptor(title="liveimage%s" % count, array=imgarray)
        out = False
        for feature in features:
            try:
                out = check_frame(feature, base) if not out else out
            except:
                pass
        # test_outputs.append(imgarray)
        # print(len(test_outputs))
        # if len(test_outputs) > 20:
        #     return test_outputs

def check_frame(feature_img, base_img, threshold=50):
    img1 = feature_img.img_.copy()
    img2 = base_img.img_.copy()
    MIN_MATCH_COUNT = threshold
    # Initiate SIFT detector
    # sift = cv.SIFT_create()
    # find the keypoints and descriptors with SIFT
    kp1, des1 = feature_img.kp(), feature_img.des()
    kp2, des2 = base_img.kp(), base_img.des()
    FLANN_INDEX_KDTREE = 1
    index_params = dict(algorithm=FLANN_INDEX_KDTREE, trees=5)
    search_params = dict(checks=50)
    flann = cv.FlannBasedMatcher(index_params, search_params)
    matches = flann.knnMatch(des1, des2, k=2)
    # store all the good matches as per Lowe's ratio test.
    good = []
    for m, n in matches:
        if m.distance < 0.7 * n.distance:
            good.append(m)
    print("comparison %s on %s had %s good matches" % (feature_img.title, base_img.title, len(good)))
    if len(good) > MIN_MATCH_COUNT:
        src_pts = np.float32([kp1[m.queryIdx].pt for m in good]).reshape(-1, 1, 2)
        dst_pts = np.float32([kp2[m.trainIdx].pt for m in good]).reshape(-1, 1, 2)
        M, mask = cv.findHomography(src_pts, dst_pts, cv.RANSAC, 5.0)
        matchesMask = mask.ravel().tolist()
        h, w, d = img1.shape
        pts = np.float32([[0, 0], [0, h - 1], [w - 1, h - 1], [w - 1, 0]]).reshape(-1, 1, 2)
        dst = cv.perspectiveTransform(pts, M)
        img2 = cv.polylines(img2, [np.int32(dst)], True, 255, 3, cv.LINE_AA)
    else:
        print("Not enough matches are found - {}/{}".format(len(good), MIN_MATCH_COUNT))
        matchesMask = None
    if len(good) > MIN_MATCH_COUNT:
        draw_params = dict(matchColor=(0, 255, 0),  # draw matches in green color
                           singlePointColor=None,
                           matchesMask=matchesMask,  # draw only inliers
                           flags=2)
        img3 = cv.drawMatches(img1, kp1, img2, kp2, good, None, **draw_params)
        img3_out = Image.fromarray(img3[:, :, ::-1].copy())
        img3_out.save("outputs/%s=%s.jpg" % ( base_img.title, feature_img.title))
    return len(good) > MIN_MATCH_COUNT

class ImageDescriptor:
    def __init__(self, filename=None, title=None, array=None):
        if filename is not None:
            self.title = filename.split("/")[-1].strip(".jpg")
            self.img_ = cv.imread(filename)
        else:
            self.title = title
            self.img_ = array
        self.kp_ = None
        self.des_ = None

    def kp(self):
        if self.kp_ is None:
            sift = cv.SIFT_create()
            self.kp_, self.des_ = sift.detectAndCompute(self.img_, None)
        return self.kp_

    def des(self):
        if self.des_ is None:
            sift = cv.SIFT_create()
            self.kp_, self.des_ = sift.detectAndCompute(self.img_, None)
        return self.des_



# commercial_reference = cv.imread('commercial.jpg')
# full_commercial_reference = cv.imread('what.jpeg')
# c_reference = cv.imread('C.jpg')
# twitch_reference = cv.imread("twitch.jpg")
# print("took %ss" % (time.time() - start))
# start = time.time()
# check_frame(twitch_reference, simg)
# print("took %ss" % (time.time() - start))

features = [
    ImageDescriptor("features/amongus_defeat.jpg"),
    ImageDescriptor("features/amongus_victory.jpg"),
    ImageDescriptor("features/amongus_lobby.jpg"),
    ImageDescriptor("features/amongus_lobby1.jpg"),
    ImageDescriptor("features/amongus_lobby3.jpg"),
    ImageDescriptor("features/amongus_lobby4.jpg")
]
ex_bases = [
    (ImageDescriptor('bases/hafu_example_win.jpg'), True),
    (ImageDescriptor('bases/hafu_example_loss.jpg'), True),
    (ImageDescriptor('bases/hafu_example_lobby.jpg'), True),
    (ImageDescriptor('bases/hafu_example_lobby2.jpg'), True),
    (ImageDescriptor('bases/hafu_example_map.jpg'), False),
    (ImageDescriptor('bases/hafu_example_tablet.jpg'), False),
    (ImageDescriptor('bases/hafu_example_vote.jpg'), False),
    (ImageDescriptor('bases/hafu_example_menu.jpg'), False),
]

sstrim = get_stream_url("itshafu")
get_live_feed(sstrim)
# for base in ex_bases:
#     print("------------------------")
#     out = False
#     for feature in features:
#         print("comparing %s on %s" % (feature.title, base[0].title))
#         start = time.time()
#         out = check_frame(feature, base[0]) if not out else out
#         print("took %ss" % (time.time() - start))
#         if out:  # quit early on positive match (but not during testing)
#             pass  # break
#     if out is not base[1]:
#         print("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA")
#     else:
#         print("base image %s was correctly classified" % base[0].title)