Skip to content

Instantly share code, notes, and snippets.

@altescy
Last active February 25, 2023 04:46
Show Gist options
  • Save altescy/8bf2c57710daf6153d20fde1498b5cea to your computer and use it in GitHub Desktop.
Save altescy/8bf2c57710daf6153d20fde1498b5cea to your computer and use it in GitHub Desktop.
import logging
import time
from multiprocessing import Process, Queue
from typing import Callable, Generic, Iterator, TypeVar
from uuid import uuid4
import fastapi
from pydantic import BaseModel
T = TypeVar("T")
logger = logging.getLogger(__name__)
class Done:
...
class Task(Process, Generic[T]):
def __init__(self, taskid: str, func: Callable[[], Iterator[T]]) -> None:
super().__init__()
self._taskid = taskid
self._func = func
self._queue: Queue[T | Done] = Queue()
self._done = False
@property
def taskid(self) -> str:
return self._taskid
@property
def done(self) -> bool:
return self._done
def run(self) -> None:
for output in self._func():
self._queue.put(output)
self._queue.put(Done())
def result(self) -> list[T]:
result: list[T] = []
while not self._queue.empty():
item = self._queue.get()
if isinstance(item, Done):
self._done = True
break
result.append(item)
return result
class Response(BaseModel, Generic[T]):
taskid: str
finished: bool
result: list[T]
app = fastapi.FastAPI()
tasks: dict[str, Task] = {}
def multistep_task() -> Iterator[str]:
for i in range(10):
time.sleep(1)
yield f"result of step {i}"
@app.post("/api/execute")
async def execute() -> str:
taskid = uuid4().hex
task = Task(taskid, multistep_task)
task.start()
tasks[taskid] = task
return task.taskid
@app.get("/api/status/{task_id}")
async def status(task_id: str) -> Response[str]:
task = tasks.get(task_id)
if task is None:
raise fastapi.HTTPException(status_code=404, detail="Task not found")
result = task.result()
if task.done:
tasks.pop(task.taskid)
response = Response(taskid=task.taskid, finished=task.done, result=result) # type: ignore[var-annotated]
return response
@app.post("/api/status/{task_id}/terminate")
async def terminate(task_id: str) -> None:
task = tasks.get(task_id)
if task is None:
raise fastapi.HTTPException(status_code=404, detail="Task not found")
task.terminate()
tasks.pop(task.taskid)
@app.get("/", response_class=fastapi.responses.HTMLResponse)
async def root() -> str:
return """
<!DOCTYPE html>
<html>
<head>
<title>FastAPI</title>
</head>
<body>
<button id="execute-btn">Execute</button>
<button id="terminate-btn" disable>Terminate</button>
<div id="status">...</div>
<div
id="results"
style="background-color: #000; color: #fff; padding: 1em;"
></div>
<script>
let running_taskid = null;
let terminated = false;
document.getElementById("execute-btn").addEventListener("click", async () => {
const response = await fetch("/api/execute", {method: "POST"});
running_taskid = await response.json();
terminated = false;
document.getElementById("status").innerText = `Running task ${running_taskid}...`;
document.getElementById("results").innerText = "";
document.getElementById("execute-btn").disabled = true;
document.getElementById("terminate-btn").disabled = false;
while (!terminated) {
const response = await fetch(`/api/status/${running_taskid}`);
const status = await response.json();
for (const result of status.result) {
let line = document.createElement("div");
line.innerText = result;
document.getElementById("results").appendChild(line);
}
if (status.finished) {
break;
}
await new Promise(resolve => setTimeout(resolve, 1000));
}
if (!terminated) {
document.getElementById("status").innerText = `Task ${running_taskid} successfully finished!`;
}
document.getElementById("execute-btn").disabled = false;
document.getElementById("terminate-btn").disabled = true;
running_taskid = null;
});
document.getElementById("terminate-btn").addEventListener("click", async () => {
if (running_taskid === null) {
return;
}
await fetch(`/api/status/${running_taskid}/terminate`, {method: "POST"});
document.getElementById("status").innerText = `Task ${running_taskid} terminated!`;
document.getElementById("execute-btn").disabled = false;
document.getElementById("terminate-btn").disabled = true;
running_taskid = null;
terminated = true;
});
</script>
</body>
</html>
"""
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
@altescy
Copy link
Author

altescy commented Jan 28, 2023

  1. Execute following commands
pip insatll  fastapi
curl -s https://gist.githubusercontent.com/altescy/8bf2c57710daf6153d20fde1498b5cea/raw | python
  1. Access to http://localhost:8000

image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment