|
import cv2 |
|
import time |
|
import uiautomator2 as u2 |
|
import numpy as np |
|
import requests |
|
import math |
|
import os |
|
import json |
|
|
|
# If you run the script directly on mobile, set this to True to disable |
|
# incompatible functions, like real-time image view, and configure for this |
|
RUN_ON_MOBILE = False |
|
|
|
# NOTE: Non-mandatory, you can skip this. |
|
# Replace this URL with your MJPEG stream URL for screen capture |
|
# Use ScreenStream app - info.dvkr.screenstream for the feed |
|
# IMPORTANT: SET RESIZE IMAGE to 100% in STREAM SETTINGS !!! |
|
stream_url = "http://192.168.0.123:8080/stream.mjpeg" |
|
# Alternatively, you can use the built-in uiautomator2 screenshot function, |
|
# but it is slow ~1 FPS or under, |
|
# it will be used automatically if the script can't connect to the stream_url |
|
|
|
# If you want to check the energy level, you need Tesseract installed and configured |
|
# When the energy level is under 2, the game will exit |
|
CHECK_ENERGY_LEVEL = True |
|
|
|
# Auto-press the generator when no match is found, only if check energy level is enabled |
|
if CHECK_ENERGY_LEVEL: |
|
# Generator positions to press, in a list |
|
GENERATOR_POSITIONS = [1, 2, 3, 4] |
|
# When there's no match, generate objects from each of these generators |
|
# Minimum energy to generate items |
|
MIN_ENERGY_LEVEL = 3 |
|
|
|
# Get the energy from the 15 seconds product list view |
|
AUTO_FARM_ENERGY = True and CHECK_ENERGY_LEVEL |
|
|
|
# Only try to get energy 3 times |
|
MAX_FARM_SESSIONS = 3 |
|
|
|
# The first 11 squares will be ignored. Adjust to your number of e.g., generators. |
|
IGNORED_MATCH_POSITIONS = 11 |
|
|
|
# Define the similarity threshold between items |
|
SIMILARITY_THRESHOLD = 0.85 |
|
|
|
# If there are a maximum of X matches groups left, press the generators |
|
# You can set to 0 if you want to use the generator when there's no match |
|
MAX_GENERATOR_GROUP_NUMBERS = 2 |
|
|
|
# NOTE: you should adjust these based on your phone display resolution. |
|
# These are for 1080x2400 and represent percentages of height or width. |
|
ROI_TOP = 0.355 # 852px height |
|
ROI_BOTTOM = 0.9025 # 2166px height |
|
ROI_PADDING = 0.0287 # 31px width |
|
# Energy left number position |
|
ENG_TOP = 0.05 # 120px height |
|
ENG_BOTTOM = 0.07 # 168px height |
|
ENG_LEFT = 0.484 # 523px width |
|
ENG_RIGHT = 0.566 # 612px width |
|
# Energy browse deals "Go" button position |
|
GO_TOP = 0.6065 # 1455px height |
|
GO_LEFT = 0.276 # 298px width |
|
# Exit "X" button from task list position |
|
EX_TOP = 0.145 # 350px height |
|
EX_LEFT = 0.926 # 1000px width |
|
|
|
# Space between grid squares, px |
|
GRID_PADDING = 7 |
|
|
|
# Check if config file exists file exists and load the parameters |
|
config_path = os.path.expanduser("~/bot_config.json") |
|
if os.path.exists(config_path): |
|
with open(config_path, 'r') as json_file: |
|
loaded_data = json.load(json_file) |
|
|
|
RUN_ON_MOBILE = loaded_data["RUN_ON_MOBILE"] |
|
IGNORED_MATCH_POSITIONS = loaded_data["IGNORED_MATCH_POSITIONS"] |
|
GENERATOR_POSITIONS = loaded_data["GENERATOR_POSITIONS"] |
|
ROI_TOP = loaded_data["ROI_TOP"] |
|
ROI_BOTTOM = loaded_data["ROI_BOTTOM"] |
|
ROI_PADDING = loaded_data["ROI_PADDING"] |
|
ENG_TOP = loaded_data["ENG_TOP"] |
|
ENG_BOTTOM = loaded_data["ENG_BOTTOM"] |
|
ENG_LEFT = loaded_data["ENG_LEFT"] |
|
ENG_RIGHT = loaded_data["ENG_RIGHT"] |
|
GO_TOP = loaded_data["GO_TOP"] |
|
GO_LEFT = loaded_data["GO_LEFT"] |
|
EX_TOP = loaded_data["EX_TOP"] |
|
EX_LEFT = loaded_data["EX_LEFT"] |
|
GRID_PADDING = loaded_data["GRID_PADDING"] |
|
MIN_ENERGY_LEVEL = loaded_data["MIN_ENERGY_LEVEL"] |
|
MAX_FARM_SESSIONS = loaded_data["MAX_FARM_SESSIONS"] |
|
SIMILARITY_THRESHOLD = loaded_data["SIMILARITY_THRESHOLD"] |
|
MAX_GENERATOR_GROUP_NUMBERS = loaded_data["MAX_GENERATOR_GROUP_NUMBERS"] |
|
else: |
|
print(f"The file {config_path} does not exist. Using default values.") |
|
|
|
# Applies Sobel edge detection to highlight edges in the image, with a user-defined threshold. |
|
def sobel_edge_detector(img, threshold=50): |
|
grad_x = cv2.Sobel(img, cv2.CV_32F, 1, 0) |
|
grad_y = cv2.Sobel(img, cv2.CV_32F, 0, 1) |
|
grad = np.sqrt(grad_x**2 + grad_y**2) |
|
grad_norm = (grad * 255 / grad.max()).astype(np.uint8) |
|
|
|
_, binary_edge = cv2.threshold(grad_norm, threshold, 255, cv2.THRESH_BINARY) |
|
|
|
return binary_edge |
|
|
|
# Display the extracted images after applying apply_processing |
|
DISPLAY_EXTRACTED_IMGS = True and not RUN_ON_MOBILE |
|
|
|
# Display the annotated image |
|
DISPLAY_ANNOTATED_IMGS = True and not RUN_ON_MOBILE |
|
|
|
if CHECK_ENERGY_LEVEL: |
|
import pytesseract |
|
print("Make sure you have Tesseract installed on the system and added to PATH") |
|
|
|
# Applies image processing techniques, including Gaussian blur and Adaptive Thresholding, Sobel edge detection or simple thresholding. |
|
def apply_processing(img, block_size=7, C=5, blur_size=5, sobel=False, sob_thresh = 35, simply_thresh=False, thresh_val=200): |
|
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) |
|
blurred_image = cv2.GaussianBlur(gray, (blur_size, blur_size), 0) |
|
|
|
if simply_thresh: |
|
ret, thresh = cv2.threshold(blurred_image, thresh_val, 255, cv2.THRESH_BINARY_INV) |
|
return thresh |
|
|
|
# Extract edges using Sobel |
|
if sobel: |
|
sobel_edges = sobel_edge_detector(gray, sob_thresh) |
|
return sobel_edges |
|
|
|
# Apply adaptive thresholding |
|
adaptive_threshold = cv2.adaptiveThreshold( |
|
blurred_image, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, block_size, C |
|
) |
|
|
|
adaptive_threshold = cv2.bitwise_not(adaptive_threshold) |
|
|
|
return adaptive_threshold |
|
|
|
# Creates a grid of rectangular contours within the specified region of interest (ROI) with padding. |
|
def generate_grid_contours(img, roi, padding=GRID_PADDING): |
|
height, width, _ = img.shape |
|
roi_min, roi_max, width_padding = roi |
|
max_rows, max_col = 9, 7 |
|
square_size = (roi_max - roi_min) // max_rows |
|
|
|
contours = [] # List to store contours |
|
|
|
# Draw rectangles mesh and find contours with padding |
|
for row in range(max_rows): |
|
for col in range(max_col): |
|
x = (col * square_size) + int(width_padding) + padding |
|
y = roi_min + row * square_size + padding |
|
contour = np.array([(x, y), (x + square_size - 2 * padding, y), |
|
(x + square_size - 2 * padding, y + square_size - 2 * padding), |
|
(x, y + square_size - 2 * padding)]) |
|
contours.append(contour) |
|
|
|
# Convert the list of contours to array format |
|
contours = np.array(contours) |
|
|
|
return contours |
|
|
|
# Extracts images from a list of contours. Applies image processing (Sobel edge detection) and morphological dilation for better blob extraction. |
|
img_dilation_kernel = np.ones((7, 7), np.uint8) |
|
def extract_imgs_from_contours(img, contours, apply_process=True): |
|
imgs_list = [] |
|
if apply_process: |
|
proc_img = apply_processing(img, sobel=True) |
|
# Apply morphological dilation to the processed image |
|
proc_img = cv2.dilate(proc_img, img_dilation_kernel, iterations=1) |
|
else: |
|
proc_img = img |
|
for contour in contours: |
|
# Extract blobs from each contour using the adaptive threshold function |
|
x, y, w, h = cv2.boundingRect(contour) |
|
cropped_img = proc_img[y:y+h, x:x+w] |
|
imgs_list.append(cropped_img) |
|
|
|
return imgs_list |
|
|
|
# Determines if an image is blank based on the number of non-zero pixels, using a specified threshold. |
|
def is_blank_img(img, threshold_pixels = 200): |
|
# Count the number of non-zero pixels |
|
non_zero_count = np.sum(img == 255) |
|
|
|
# Check if the count is below the threshold |
|
return non_zero_count < threshold_pixels |
|
|
|
# Compares two images by finding differing pixels and calculates a normalized similarity metric. |
|
def compare_imgs(img1, img2): |
|
height, width = img1.shape |
|
|
|
# Find pixels that differ between the two images |
|
diff_img = np.bitwise_xor(img1, img2) |
|
|
|
# Count the number of white pixels |
|
diff_pixels_cnt = np.count_nonzero(diff_img) |
|
|
|
normalized_similarity = 1 - (diff_pixels_cnt / (height * width)) |
|
|
|
return normalized_similarity |
|
|
|
# Groups similar images based on the specified similarity threshold, ignoring blank and ignored positions. Returns a list of grouped items. |
|
def group_similar_imgs(imgs, compare_threshold=0.8): |
|
grouped_items = [] |
|
visited = set() |
|
|
|
for i, img1 in enumerate(imgs): |
|
if is_blank_img(img1) or i < IGNORED_MATCH_POSITIONS: |
|
visited.add(i) |
|
continue |
|
|
|
if i not in visited: |
|
group = [i] |
|
found_match = False # Flag to check if any similar blob is found |
|
|
|
for j, img2 in enumerate(imgs): |
|
if is_blank_img(img2) or j < IGNORED_MATCH_POSITIONS: |
|
visited.add(j) |
|
continue |
|
if i != j and j not in visited: |
|
similarity = compare_imgs(img1, img2) |
|
if similarity > compare_threshold: |
|
group.append(j) |
|
visited.add(j) |
|
found_match = True |
|
|
|
# Add the group only if a match was found |
|
if found_match: |
|
grouped_items.append(group) |
|
for index in group: |
|
visited.add(index) |
|
|
|
return grouped_items |
|
|
|
# Annotates an image with marked regions of interest (ROI), ignored contours, and marked contours within groups. |
|
# Contours are drawn with different colors and labeled with their respective group IDs. |
|
def annotate_image(img, contours, groups, roi): |
|
height, width, _ = img.shape |
|
|
|
# Unpack region of interest |
|
roi_min, roi_max, width_padding = roi |
|
|
|
# Mark ROI on image |
|
cv2.rectangle(img, (0, roi_min), (width, roi_max), (0, 255, 255), 6) |
|
|
|
# Draw ignored contours |
|
for ig in range(IGNORED_MATCH_POSITIONS): |
|
cv2.drawContours(img, [contours[ig]], 0, (0, 0, 255), 4) |
|
|
|
if CHECK_ENERGY_LEVEL: |
|
for pos in GENERATOR_POSITIONS: |
|
cv2.drawContours(img, [contours[pos - 1]], 0, (0, 160, 255), 6) |
|
|
|
# Mark contours and groups on image |
|
for group_id, contour_indices in enumerate(groups): |
|
color = (group_id * 30) % 255 |
|
for index in contour_indices: |
|
contour = contours[index] |
|
cv2.drawContours(img, [contour], 0, (color, 127, 50), 3) |
|
cv2.putText(img, str(group_id), tuple(contour[0]), cv2.FONT_HERSHEY_SIMPLEX, 2, (color, 127, 50), 4) |
|
|
|
return img |
|
|
|
# Swipes through elements within groups on a device, avoiding repeated swiping of positions. |
|
def swipe_elements(device, contours, groups, roi): |
|
roi_min, roi_max, width_padding = roi |
|
already_swiped_positions = set() |
|
|
|
for group_id, contour_indices in enumerate(groups): |
|
for i in range(len(contour_indices) - 1): |
|
index1 = contour_indices[i] |
|
index2 = contour_indices[i + 1] |
|
|
|
contour1 = contours[index1] |
|
contour2 = contours[index2] |
|
|
|
x1, y1, w1, h1 = cv2.boundingRect(contour1) |
|
x2, y2, w2, h2 = cv2.boundingRect(contour2) |
|
|
|
# Check if the positions have already been swiped |
|
position1 = (x1 + w1 // 2, y1 + h1 // 2) |
|
position2 = (x2 + w2 // 2, y2 + h2 // 2) |
|
|
|
if position1 in already_swiped_positions or position2 in already_swiped_positions: |
|
continue # Skip if either position has been swiped |
|
|
|
# Swipe from the center of the first contour to the center of the second contour |
|
device.swipe(x1 + w1 // 2, y1 + h1 // 2, x2 + w2 // 2, y2 + h2 // 2, 0.05) |
|
|
|
# Update the set of already swiped positions |
|
already_swiped_positions.update([position1, position2]) |
|
|
|
if len(groups) > 0: |
|
# Touch the first element of the list after merge, because of contouring |
|
x1, y1, _, _= cv2.boundingRect(contours[0]) |
|
device.click(x1, y1) |
|
|
|
# Generates objects by clicking on specified generator positions. |
|
def generate_objects(device, contours, img): |
|
energy_left = get_energy_level(img) |
|
for pos in GENERATOR_POSITIONS: |
|
x, y, _, _ = cv2.boundingRect(contours[pos]) |
|
if energy_left <= MIN_ENERGY_LEVEL: |
|
print("No energy left") |
|
return False |
|
device.click(x, y) |
|
device.click(x, y) |
|
energy_left = energy_left - 1 |
|
|
|
return True |
|
|
|
# Resizes input image based on the specified max height. |
|
def resize_image(image, max_height=720): |
|
# Get the original dimensions of the image |
|
if len(image.shape) == 3: |
|
original_height, original_width, _ = image.shape |
|
else: |
|
original_height, original_width = image.shape |
|
|
|
# Calculate the scaling factor to maintain aspect ratio |
|
scale_factor = max_height / original_height |
|
|
|
# Calculate the new dimensions |
|
new_height = int(original_height * scale_factor) |
|
new_width = int(original_width * scale_factor) |
|
|
|
# Resize the image |
|
resized_image = cv2.resize(image, (new_width, new_height)) |
|
|
|
return resized_image |
|
|
|
# Reads the screen content using MJPEG streaming. If streaming fails, falls back to the uiautomator2's screenshot method. |
|
using_streaming = True |
|
def screen_stream_read(device): |
|
global using_streaming |
|
if(using_streaming): |
|
try: |
|
r = requests.get(stream_url, stream=True) |
|
if(r.status_code == 200): |
|
bytes = b'' |
|
for chunk in r.iter_content(chunk_size=1024): |
|
bytes += chunk |
|
a = bytes.find(b'\xff\xd8') |
|
b = bytes.find(b'\xff\xd9') |
|
if a != -1 and b != -1: |
|
jpg = bytes[a:b+2] |
|
bytes = bytes[b+2:] |
|
img = cv2.imdecode(np.frombuffer(jpg, dtype=np.uint8), cv2.IMREAD_COLOR) |
|
return img |
|
else: |
|
print("Received unexpected status code {}".format(r.status_code)) |
|
except: |
|
print("Could not connect to the screen streaming service - start the ScreenStream app service!") |
|
print("Using low FPS screenshot method from uiautomator2") |
|
using_streaming = False |
|
return device.screenshot(format='opencv') |
|
else: |
|
return device.screenshot(format='opencv') |
|
|
|
# Extracts and returns the energy level from a given image. Utilizes Tesseract OCR for text extraction. |
|
def get_energy_level(img): |
|
# Note: you should have Tesseract installed and set in path to use this function |
|
height, width, _ = img.shape |
|
x, y, x1, y1 = int(width * ENG_LEFT), int(height * ENG_TOP), int(width * ENG_RIGHT), int(height * ENG_BOTTOM) |
|
cropped = img[y:y1, x:x1] |
|
|
|
# Preprocess the cropped image for better text recognition |
|
gray = cv2.cvtColor(cropped, cv2.COLOR_BGR2GRAY) |
|
|
|
# # Apply thresholding to enhance text visibility |
|
_, thresh = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) |
|
|
|
cv2.floodFill(thresh, None, (5, 5), 0, flags=8) |
|
|
|
# Use pytesseract to extract numeric text from the preprocessed image |
|
custom_config = r'--oem 3 --psm 7 outputbase digits' # Tesseract OCR configuration for numeric digits |
|
text = pytesseract.image_to_string(thresh, config=custom_config) |
|
|
|
if DISPLAY_EXTRACTED_IMGS: |
|
cv2.imshow("Extracted energy", thresh) |
|
print("Energy: ", text.strip()) |
|
cv2.waitKey(5) |
|
|
|
try: |
|
return int(text.strip()) |
|
except ValueError: |
|
print("Could not detect energy level, assuming 100, text: ", text.strip()) |
|
return 100 |
|
|
|
# Automatically farm energy from tasks |
|
def farm_energy(img, device): |
|
height, width, _ = img.shape |
|
# Open tasks menu |
|
device.click(width * ENG_LEFT, height * ENG_TOP) |
|
time.sleep(2) |
|
for i in range(MAX_FARM_SESSIONS): |
|
# Hit the "Go" button and wait for X seconds |
|
device.click(width * GO_LEFT, height * GO_TOP) |
|
time.sleep(16) |
|
device.press("back") |
|
time.sleep(2) |
|
# Hit the "Claim" button |
|
device.click(width * GO_LEFT, height * GO_TOP) |
|
time.sleep(3) |
|
# Exit task menu |
|
device.click(width * EX_LEFT, height * EX_TOP) |
|
time.sleep(1) |
|
|
|
# Combines a list of binary images into a grid with the specified number of columns and rows, just for debugging display. |
|
def combine_binary_images(extracted_imgs, columns=7, rows=9): |
|
# Ensure that the number of images is consistent with the specified grid size |
|
if len(extracted_imgs) != (columns * rows): |
|
raise ValueError(f"Number of images ({len(extracted_imgs)}) is not compatible with the grid size ({columns}x{rows}).") |
|
|
|
# Resize images to have the same height (assuming they have the same width) |
|
height = extracted_imgs[0].shape[0] |
|
resized_imgs = [cv2.resize(img, (height, height)) for img in extracted_imgs] |
|
|
|
# Combine images into a grid |
|
combined_img = np.vstack([np.hstack(resized_imgs[i:i+columns]) for i in range(0, len(resized_imgs), columns)]) |
|
|
|
return combined_img |
|
|
|
# Checks if the current app running on the device is Aliexpress and the screen is on. |
|
def check_if_ali_app(device): |
|
pkg_name = device.info.get("currentPackageName") |
|
screenOn = device.info.get("screenOn") |
|
if("aliexpress" in pkg_name and screenOn): |
|
return True |
|
return False |
|
|
|
# Waits for the Aliexpress app to be opened on the device. |
|
def wait_for_ali_app(device): |
|
if(check_if_ali_app(device)): |
|
print("Aliexpress app is running") |
|
return |
|
else: |
|
print("Please open Aliexpress Merge Boss game. Waiting 10 seconds.") |
|
time.sleep(10) |
|
wait_for_ali_app(device) |
|
|
|
def main(): |
|
print("Don't forget to open the ATX app, and ensure both AtxAgent and UIAutomator are running!\n") |
|
time.sleep(1) |
|
|
|
# Connect to the Android device |
|
if RUN_ON_MOBILE: |
|
print("Running on mobile set to true") |
|
device = u2.connect('127.0.0.1') |
|
else: |
|
device = u2.connect() |
|
|
|
print("Checking if the Aliexpress app is running") |
|
|
|
wait_for_ali_app(device) |
|
|
|
img = screen_stream_read(device) |
|
|
|
# Only try to merge objects with a similarity above this threshold |
|
height, width, _ = img.shape |
|
|
|
# Define the region of interest for duplicate findings |
|
# Top, bottom, left, right padding |
|
roi = int(ROI_TOP * height), int(ROI_BOTTOM * height), width * ROI_PADDING |
|
|
|
# Generate ROI grid contours |
|
grid_contours = generate_grid_contours(img, roi) |
|
|
|
# Remember the energy farm status |
|
farm_the_energy = AUTO_FARM_ENERGY |
|
|
|
while True: |
|
# Read the screenshot in memory |
|
img = screen_stream_read(device) |
|
|
|
if not check_if_ali_app(device): |
|
print("Aliexpress app is not running anymore") |
|
break |
|
|
|
extracted_imgs = extract_imgs_from_contours(img, grid_contours, apply_process=True) |
|
|
|
if DISPLAY_EXTRACTED_IMGS: |
|
display_extracted_img = combine_binary_images(extracted_imgs) |
|
res_display_extracted_img = resize_image(display_extracted_img) |
|
cv2.imshow("Extracted images", res_display_extracted_img) |
|
|
|
grouped_items = group_similar_imgs(extracted_imgs, SIMILARITY_THRESHOLD) |
|
|
|
# Check the energy left and matches |
|
if CHECK_ENERGY_LEVEL and len(grouped_items) <= MAX_GENERATOR_GROUP_NUMBERS: |
|
if generate_objects(device, grid_contours, img) == False and len(grouped_items) == 0: |
|
print("No group found.") |
|
if farm_the_energy: |
|
print("Starting to farm energy.") |
|
farm_energy(img, device) |
|
print("Finish farming.") |
|
farm_the_energy = False |
|
else: |
|
print("No energy to farm. Exit.") |
|
break |
|
|
|
if DISPLAY_ANNOTATED_IMGS: |
|
annotated_img = annotate_image(img, grid_contours, grouped_items, roi) |
|
|
|
# Resize image for display |
|
res_annotated_img = resize_image(annotated_img) |
|
|
|
# Display the screenshot with annotations |
|
cv2.imshow("Display annotations", res_annotated_img) |
|
|
|
if cv2.waitKey(20) & 0xFF == ord('q'): |
|
break |
|
cv2.destroyAllWindows() |
|
|
|
# Swipe duplicates one over another |
|
swipe_elements(device, grid_contours, grouped_items, roi) |
|
|
|
if __name__ == "__main__": |
|
main() |
I'd say it's a great repo material ;)
I wonder, did you try to use Bluestacks to run the game?