Guide
The simplest application
Let's create our first FPS application. Enter the following code in a file called simple.py
:
from fps import Module
class Main(Module):
def __init__(self, name, **kwargs):
super().__init__(name)
self.config = kwargs
async def start(self):
print(self.config["greeting"])
async def stop(self):
print(self.config["farewell"])
And enter in the terminal:
fps simple:Main --set greeting="Hello, World!" --set farewell="See you later!"
This should print Hello, World!
and hang forever, which means that the application is running. To exit, press Ctrl-C. This should now print See you later!
and return to the terminal prompt.
What happened?
- By entering
fps simple:Main
, we told FPS to run the module calledMain
in thesimple.py
file. - Options
--set greeting="Hello, World!"
and--set farewell="See you later!"
told FPS to pass parameter keysgreeting
andfarewell
toMain.__init__
's keyword arguments, with values"Hello, World!"
and"See you later!"
, respectively. - In its startup phase (
start
method),Main
prints thegreeting
parameter value. - After starting, the application runs until it is stopped. Pressing Ctrl-C stops the application, calling its teardown phase.
- In its teardown phase (
stop
method),Main
prints thefarewell
parameter value.
Sharing objects between modules
Now let's see how we can share objects between modules. Enter the following code in a file called share.py
:
from anyio import Event, sleep
from fps import Module
class Main(Module):
def __init__(self, name):
super().__init__(name)
self.add_module(Publisher, "publisher")
self.add_module(Consumer, "consumer")
class Publisher(Module):
async def start(self):
self.shared = Event() # the object to share
self.put(self.shared, Event) # publish the shared object as type Event
print("Published:", self.shared.is_set())
await self.shared.wait() # wait for the shared object to be updated
self.exit_app() # force the application to exit
async def stop(self):
print("Got:", self.shared.is_set())
class Consumer(Module):
def __init__(self, name, wait=0):
super().__init__(name)
self.wait = float(wait)
async def start(self):
shared = await self.get(Event) # request an object of type Event
print("Acquired:", shared.is_set())
await sleep(self.wait) # wait before updating the shared object
shared.set() # update the shared object
print("Updated:", shared.is_set())
And enter in the terminal:
fps share:Main
You should see in the terminal:
Published: False
Acquired: False
Updated: True
Got: True
Sharing objects between modules is based on types: a module (Consumer
) requests an object of a given type (Event
) with await self.get
, and it eventually acquires it when another module (Publisher
) publishes an object of this type with self.put
. It is the same object that they are sharing, so if Consumer
changes the object, Publisher
sees it immediatly.
The Consumer
's default value for parameter wait
is 0, which means that the shared object will be updated right away. If we set it to 0.5 seconds:
fps share:Main --set consumer.wait=0.5
You should see that the application hangs for half a second after the shared object is acquired. This illustrate that we can configure any nested module in the application, just by providing the path to its parameter in the CLI. If we provide a wrong parameter name, we get a nice error:
fps share:Main --set consumer.wrong_parameter=0.5
RuntimeError: Cannot instantiate module 'root_module.consumer': Consumer.__init__() got an unexpected keyword argument 'wrong_parameter'
A pluggable web server
FPS comes with a FastAPIModule
that publishes a FastAPI
application. This FastAPI
object can be shared with other modules, which can add routes to it. As part of its startup phase, FastAPIModule
serves the FastAPI
application with a web server. Enter the following code in a file called server.py
:
from fastapi import FastAPI
from fps import Module
from fps.web.fastapi import FastAPIModule
from pydantic import BaseModel
class Main(Module):
def __init__(self, name):
super().__init__(name)
self.add_module(FastAPIModule, "fastapi")
self.add_module(Router, "router")
class Router(Module):
def __init__(self, name, **kwargs):
super().__init__(name)
self.config = Config(**kwargs)
async def prepare(self):
app = await self.get(FastAPI)
@app.get("/")
def read_root():
return {self.config.key: self.config.value}
class Config(BaseModel):
key: str = "count"
value: int = 3
And enter in the terminal:
fps server:Main
Now if you open a browser at http://127.0.0.1:8000
, you should see:
{"count":3}
Note that Router
has a prepare
method. It is similar to the start
method, be it is executed just before. Typically, this is used by modules like FastAPIModule
which must give a chance to every other module to register their routes on the FastAPI
application, before running the server in start
, because routes cannot be added once the server has started.
See how Router
uses a Pydantic model Config
to validate its configuration. With this, running the application with a wrong type will not work:
fps main:Main --set router.value=foo
# RuntimeError: Cannot instantiate module 'root_module.router': 1 validation error for Config
# value
# Input should be a valid integer, unable to parse string as an integer [type=int_parsing, input_value='foo', input_type=str]
# For further information visit https://errors.pydantic.dev/2.10/v/int_parsing
Jupyverse uses FastAPIModule
in order to compose a Jupyter server from swappable pluggins.
A declarative application
It is possible to configure an application entirely as a Python dictionary or a JSON file. Let's rewrite the previous example in router.py
, and just keep the code for the Router
module:
from fastapi import FastAPI
from fps import Module
from pydantic import BaseModel
class Router(Module):
def __init__(self, name, **kwargs):
super().__init__(name)
self.config = Config(**kwargs)
async def prepare(self):
app = await self.get(FastAPI)
@app.get("/")
def read_root():
return {self.config.key: self.config.value}
class Config(BaseModel):
key: str = "count"
value: int = 3
Now we can write a config.json
file like so:
{
"main": {
"type": "fps_module",
"modules": {
"fastapi": {
"type": "fps.web.fastapi:FastAPIModule"
},
"router": {
"type": "router:Router",
"config": {
"value": 7
}
}
}
}
}
And launch our application with:
fps --config config.json
Note that the type
field in config.json
can be a path to a module, like fps.web.fastapi:FastAPIModule
or router:Router
, or a module name registered in the fps.modules
entry-point group, like fps_module
which is a base FPS Module
.
A note on concurrency
The following Module
methods are run as background tasks:
prepare
start
stop
FPS will consider each of them to have completed if they run to completion, or if they call self.done()
. Let's consider the following example:
from anyio import sleep
from fps import Module
class MyModule(Module):
async def start(self):
await sleep(float("inf"))
FPS will notice that this module never completes the startup phase, because its start
method hangs indefinitely. By default, this will time out after one second. The solution is to launch a background task and then explicitly call self.done()
, like so:
from anyio import create_task_group, sleep
from fps import Module
class MyModule(Module):
async def start(self):
async with create_task_group() as tg:
tg.start_soon(sleep, float("inf"))
self.done()
Contexts
FPS offers a Context
class that allows to share objects independantly of modules. For instance, say you want to share a file object. Here is how you would do:
from io import TextIOWrapper
from anyio import run
from fps import Context
async def main():
async with Context() as context:
file = open("log.txt", "w")
print("File opened")
def teardown_callback():
file.close()
print("File closed")
shared_file = context.put(file, teardown_callback=teardown_callback)
print("File object published")
acquired_file = await context.get(TextIOWrapper)
print("File object acquired")
assert acquired_file.unwrap() is file
print("Writing to file")
acquired_file.unwrap().write("Hello, World!\n")
acquired_file.drop()
print("File object dropped")
await shared_file.freed()
run(main)
Running this code will print:
File opened
File object published
File object acquired
Writing to file
File object dropped
File closed
Let's see what happened:
- We created an object that we want to share, here file
. This file has to be closed eventually.
- We published it in the context
, with context.put(file, teardown_callback=teardown_callback)
. The teardown_callback
will be called when the context is closed. We got a shared_file
handle that we can use to check if the object is still in use (see below).
- We acquired the file object with await context.get(TextIOWrapper)
, and we got an acquired_file
handle that we can use to drop the object when we are done using it. Note that acquiring an object is usually done in some other part of the program, where only the context
is available.
- We write to the file using acquired_file.unwrap().write("Hello, World!\n")
. Note that we call unwrap()
to get the actual object, since our handle is a wrapper around the object.
- We drop the file object with acquired_file.drop()
, notifying the shared_file
that we are done using it and that from our point of view it is safe to close it.
- The publisher can check that the published file is not used anymore with await shared_file.freed()
.
- When the context
is closed, it waits for every published object to be freed and then it proceeds with their teardown, if any.
Contexts ensure that objects are shared safely by their "owner" and that they are torn down when they are not being used anymore, by keeping references of "borrowers". Borrowers must collaborate by explicitly dropping objects when they are done using them. Owners can explicitly check that their objects are free to be disposed, althoug this is optional.
Signals
FPS offers a Signal
class which allows one part of the code to send values that can be received in another part. One can listen to a signal by connecting a callback to it or simply by iterating values from it.
The following code uses a callback:
from anyio import run
from fps import Signal
async def main():
signal = Signal()
async def callback(value):
print("Received:", value)
signal.connect(callback)
await signal.emit("Hello")
await signal.emit("World!")
run(main)
# prints:
# Received: Hello
# Received: World!
And the following code uses an iterator:
from anyio import TASK_STATUS_IGNORED, create_task_group, run
from anyio.abc import TaskStatus
from fps import Signal
async def main():
signal = Signal()
async def iterate_signal(*, task_status: TaskStatus[None] = TASK_STATUS_IGNORED):
async with signal.iterate() as iterator:
task_status.started()
async for value in iterator:
if not value:
return
print("Received:", value)
async with create_task_group() as tg:
await tg.start(iterate_signal)
await signal.emit("Hello")
await signal.emit("World!")
await signal.emit("")
run(main)
# prints:
# Received: Hello
# Received: World!