Custom tasks
To create a custom task you should know about IO Metdata.
There is are demos:
functional tasks
module: streamtasks.system.fntask
This is the simplest way of creating custom tasks.
This can NOT do everything that can be done with tasks.
example:
from streamtasks.system.fntask import fntask
@fntask()
def adder(a: int, b: int) -> int: return a + b
if __name__ == "__main__": adder.run_sync()
When calling run
or run_sync
you can specify a link, a url or None. When specifying a link it will use the link as its connection. When providing a string, None or no argument it acts like creating a connection with connect
(see connection).
special arguments
timestamp: int
the timestamp of the newest input parameter
config: Any
The config must be a type that can be initialized by calling its type (to create the default config). It holds the configuration the task is started with.
state: Any
The state must be a type that can be initialized by calling its type. You can use the state to hold data that is task specific and is required accross calls.
Full example:
from streamtasks.system.fntask import fntask
from dataclasses import dataclass
@dataclass
class State:
count: int = 0
@dataclass
class Config:
step: int = 1
@fntask()
def demo2(a: int, state: State, config: Config) -> int:
state.count += config.step
return a + state.count
if __name__ == "__main__": demo2.run_sync()
You can also specify arguments for the fntask
decorator:
label
- specify a label for the task
thread_safe
- if the function is thread safe. Used to run synchronous functions in seperate threads.
You can annotate the return type(s) and input types with the Annotated
type. This allows setting the IO metadata and allowing the mapping of config values to the IO metadata. See IO Metdata or examples/fntask.py
for more information.
Full Tasks
You can implement anything that can be implemented with tasks by creating a class inheriting streamtasks.system.task.Task
.
For examples see: streamtasks/system/tasks/**