Skip to content

Instantly share code, notes, and snippets.

@trenthudy
Last active July 25, 2019 01:03
Show Gist options
  • Save trenthudy/d6cb6b223676cae349a2c40398dc45aa to your computer and use it in GitHub Desktop.
Save trenthudy/d6cb6b223676cae349a2c40398dc45aa to your computer and use it in GitHub Desktop.
A simple Android threading solution, that allows work to be lifted from the main thread and delegated to a background thread pool.
package io.hudepohl.custom_thread_sample;
import android.os.Handler;
import android.os.Looper;
import android.os.Message;
import androidx.annotation.MainThread;
import androidx.annotation.WorkerThread;
import org.jetbrains.annotations.NotNull;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created by Trent Hudepohl on 4/12/18.
*
* A simple Android threading solution, that allows work to be lifted from the main thread
* and delegated to a background thread pool. After the background work completes, its result
* is dispatched back to the main thread.
*
* This solution is inspired by Android's 'android.os.AsyncTask', but aims to improve some of its
* flaws / missing features:
*
* - Too abstract for most use cases (i.e. it requires three generic type parameters).
* - Lack of a clear distinction between an "error" and a "result".
* - Exceptions thrown during the work execution are not passed back cleanly.
* - Lack of a clear state update callback (i.e. when the work starts "processing").
* - Exposes the complexities of multi-threading, when most use cases do not require it.
*
* Uses 'java.util.concurrent.ThreadPoolExecutor' to create a thread pool with a size based on
* the device's CPU count. While this should satisfy the majority of use cases, projects doing
* a ton of background processing should consider a more robust solution (like RxJava).
*
* The work is not started until 'execute()' is called. All instances of BackgroundWork can only
* be executed one time. After 'execute()' is called the first time, subsequent calls will result
* in an 'java.lang.IllegalStateException'.
*
* Work can be cancelled by calling 'cancel()'. It is recommended to call 'cancel()' on work that
* is no longer needed whenever possible. Calling 'execute()' on work that has already been
* cancelled will result in an 'java.lang.IllegalStateException'.
*
* The three terminating states are 'RESULT', 'ERROR', and 'CANCELLED'. After 'onStateChange()'
* has received one of these states, no other callbacks will be invoked.
*/
public abstract class BackgroundWork<T> {
private static final String THREAD_NAME_PREFACE = "BackgroundWork_";
private static final ThreadFactory THREAD_FACTORY = new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
public Thread newThread(Runnable r) {
return new Thread(r, THREAD_NAME_PREFACE + mCount.getAndIncrement());
}
};
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
private static final int MAXIMUM_POOL_SIZE = (CPU_COUNT * 2);
private static final int KEEP_ALIVE_TIME = 30;
private static final TimeUnit KEEP_ALIVE_TIME_UNIT = TimeUnit.SECONDS;
private static final BlockingQueue<Runnable> POOL_WORK_QUEUE = new LinkedBlockingQueue<>(128);
private static final Executor THREAD_POOL_EXECUTOR;
static {
ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAXIMUM_POOL_SIZE,
KEEP_ALIVE_TIME,
KEEP_ALIVE_TIME_UNIT,
POOL_WORK_QUEUE,
THREAD_FACTORY
);
threadPoolExecutor.allowCoreThreadTimeOut(true);
THREAD_POOL_EXECUTOR = threadPoolExecutor;
}
private final AtomicBoolean wasInvoked = new AtomicBoolean(false);
private final AtomicBoolean wasErrored = new AtomicBoolean(false);
private final AtomicBoolean wasCancelled = new AtomicBoolean(false);
private final Handler handler;
private final FutureTask<T> futureTask;
private T result = null;
private Throwable error = null;
public BackgroundWork() {
this.handler = new MainThreadHandler();
Callable<T> callableWork = new Callable<T>() {
@Override
public T call() {
wasInvoked.set(true);
publishProcessing();
try {
result = doBackgroundWork();
publishResult();
} catch (Exception exception) {
if (!wasCancelled.get()) {
if (wasErrored.compareAndSet(false, true)) {
error = exception;
publishError();
}
}
}
return result;
}
};
this.futureTask = new FutureTask<T>(callableWork) {
@Override
protected void done() {
try {
get();
} catch (Exception exception) {
if (!wasCancelled.get()) {
if (wasErrored.compareAndSet(false, true)) {
error = (exception instanceof ExecutionException) ?
exception.getCause() : exception;
publishError();
}
}
}
}
};
}
private void publishPending() {
this.handler.obtainMessage(MESSAGE_PENDING, this).sendToTarget();
}
private void publishProcessing() {
this.handler.obtainMessage(MESSAGE_PROCESSING, this).sendToTarget();
}
private void publishResult() {
this.handler.obtainMessage(MESSAGE_RESULT, this).sendToTarget();
}
private void publishError() {
this.handler.obtainMessage(MESSAGE_ERROR, this).sendToTarget();
}
private void publishCancelled() {
if (this.wasCancelled.compareAndSet(false, true)) {
this.handler.obtainMessage(MESSAGE_CANCELLED, this).sendToTarget();
}
}
public void execute() {
if (this.wasInvoked.get()) throw new IllegalStateException("BackgroundWork was already executed");
if (this.wasCancelled.get()) throw new IllegalStateException("BackgroundWork was cancelled");
publishPending();
THREAD_POOL_EXECUTOR.execute(this.futureTask);
}
public final boolean cancel() {
publishCancelled();
return this.futureTask.cancel(true);
}
@WorkerThread
protected abstract T doBackgroundWork();
@MainThread
protected abstract void onResult(@NotNull T result);
@MainThread
protected abstract void onError(@NotNull Throwable error);
public enum State {
PENDING,
PROCESSING,
RESULT,
ERROR,
CANCELLED
}
@MainThread
protected abstract void onStateChange(@NotNull State workState);
private static final int MESSAGE_PENDING = 0x1;
private static final int MESSAGE_PROCESSING = 0x2;
private static final int MESSAGE_RESULT = 0x3;
private static final int MESSAGE_ERROR = 0x4;
private static final int MESSAGE_CANCELLED = 0x5;
private static class MainThreadHandler extends Handler {
MainThreadHandler() {
super(Looper.getMainLooper());
}
@Override
public void handleMessage(Message msg) {
BackgroundWork<?> work = (BackgroundWork<?>) msg.obj;
switch (msg.what) {
case MESSAGE_PENDING:
work.notifyPending();
break;
case MESSAGE_PROCESSING:
work.notifyProcessing();
break;
case MESSAGE_RESULT:
work.notifyResult();
break;
case MESSAGE_ERROR:
work.notifyError();
break;
case MESSAGE_CANCELLED:
work.notifyCancelled();
break;
}
}
}
private void notifyPending() {
onStateChange(State.PENDING);
}
private void notifyProcessing() {
onStateChange(State.PROCESSING);
}
private void notifyResult() {
if (this.result != null) {
onResult(this.result);
}
onStateChange(State.RESULT);
}
private void notifyError() {
if (this.error != null) {
onError(this.error);
}
onStateChange(State.ERROR);
}
private void notifyCancelled() {
onStateChange(State.CANCELLED);
}
}
class BackgroundWorkExample {
fun doSomethingCool() {
val work = object : BackgroundWork<String>() {
override fun doBackgroundWork(): String {
Thread.sleep(10_000)
return "Hello, world!"
}
override fun onResult(result: String) {
}
override fun onError(error: Throwable) {
}
override fun onStateChange(workState: State) {
when (workState) {
State.PENDING,
State.PROCESSING,
State.RESULT,
State.ERROR,
State.CANCELLED -> {
}
}
}
}
// do the work
work.execute()
// if you change your mind
work.cancel()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment