Skip to content

API

App

App(content: ElementFactory, state_resolver: StateResolver | None = None, page_factory: PageFactory = default_page)

Methods:

Name Description
__call__
Source code in rxxxt/app.py
37
38
39
40
def __init__(self, content: ElementFactory, state_resolver: StateResolver | None = None, page_factory: PageFactory = default_page) -> None:
  self._content = content
  self._page_factory: PageFactory = page_factory
  self._state_resolver = state_resolver or default_state_resolver()

__call__ async

__call__(scope: ASGIScope, receive: ASGIFnReceive, send: ASGIFnSend) -> Any
Source code in rxxxt/app.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
async def __call__(self, scope: ASGIScope, receive: ASGIFnReceive, send: ASGIFnSend) -> Any:
  if scope["type"] == "http":
    context = HTTPContext(scope, receive, send)
    try: await self._handle_http(context)
    except (ValidationError, ValueError) as e:
      logging.debug(e)
      return await context.respond_text("bad request", 400)
    except BaseException as e:
      logging.debug(e)
      return await context.respond_text("internal server error", 500)
  elif scope["type"] == "websocket":
    context = WebsocketContext(scope, receive, send)
    try: await self._ws_session(context)
    except BaseException as e:
      logging.debug(e)
      await context.close(1011, "Internal error")
    finally:
      if context.connected: await context.close()

Component

Component()

Bases: Element

Methods:

Name Description
add_job

Runs a background job until completion. Only runs when the session is persistent.

add_worker

Runs a background worker, which may be cancelled at any time. Only runs when the session is persistent.

lc_destroy
lc_handle_event
lc_init
lc_render
on_after_destroy
on_after_update
on_before_destroy
on_before_update
on_init
render
tonode

Attributes:

Name Type Description
context Context
Source code in rxxxt/component.py
79
80
81
82
83
def __init__(self) -> None:
  super().__init__()
  self.context: Context
  self._worker_tasks: list[asyncio.Task] = []
  self._job_tasks: list[asyncio.Task] = []

context instance-attribute

context: Context

add_job

add_job(a: Coroutine)

Runs a background job until completion. Only runs when the session is persistent. args: a: Coroutine - the coroutine that should be run

Source code in rxxxt/component.py
88
89
90
91
92
93
94
95
96
def add_job(self, a: Coroutine):
  """
  Runs a background job until completion. Only runs when the session is persistent.
  args:
    a: Coroutine - the coroutine that should be run
  """
  if self.context.config.persistent:
    self._worker_tasks.append(asyncio.create_task(a))
  else: a.close()

add_worker

add_worker(a: Coroutine)

Runs a background worker, which may be cancelled at any time. Only runs when the session is persistent. args: a: Coroutine - the coroutine that should be run

Source code in rxxxt/component.py
 97
 98
 99
100
101
102
103
104
105
def add_worker(self, a: Coroutine):
  """
  Runs a background worker, which may be cancelled at any time. Only runs when the session is persistent.
  args:
    a: Coroutine - the coroutine that should be run
  """
  if self.context.config.persistent:
    self._worker_tasks.append(asyncio.create_task(a))
  else: a.close()

lc_destroy async

lc_destroy() -> None
Source code in rxxxt/component.py
116
117
118
119
120
121
122
123
124
125
126
127
async def lc_destroy(self) -> None:
  await self.on_before_destroy()
  if len(self._job_tasks) > 0:
    try: await asyncio.wait(self._job_tasks)
    except asyncio.CancelledError: pass
    self._job_tasks.clear()
  if len(self._worker_tasks) > 0:
    for t in self._worker_tasks: t.cancel()
    try: await asyncio.wait(self._worker_tasks)
    except asyncio.CancelledError: pass
    self._worker_tasks.clear()
  await self.on_after_destroy()

lc_handle_event async

lc_handle_event(event: dict[str, int | float | str | bool])
Source code in rxxxt/component.py
129
130
131
132
133
134
async def lc_handle_event(self, event: dict[str, int | float | str | bool]):
  handler_name = event.pop("$handler_name", None)
  if isinstance(handler_name, str):
    handler = getattr(self, handler_name, None) # NOTE: this is risky!!
    if isinstance(handler, InstanceEventHandler):
      await to_awaitable(handler, **event)

lc_init async

lc_init(context: Context) -> None
Source code in rxxxt/component.py
107
108
109
async def lc_init(self, context: Context) -> None:
  self.context = context
  await self.on_init()

lc_render async

lc_render() -> Element
Source code in rxxxt/component.py
111
112
113
114
115
async def lc_render(self) -> Element:
  await self.on_before_update()
  el = await to_awaitable(self.render)
  await self.on_after_update()
  return el

on_after_destroy async

on_after_destroy() -> None
Source code in rxxxt/component.py
140
async def on_after_destroy(self) -> None: ...

on_after_update async

on_after_update() -> None
Source code in rxxxt/component.py
138
async def on_after_update(self) -> None: ...

on_before_destroy async

on_before_destroy() -> None
Source code in rxxxt/component.py
139
async def on_before_destroy(self) -> None: ...

on_before_update async

on_before_update() -> None
Source code in rxxxt/component.py
137
async def on_before_update(self) -> None: ...

on_init async

on_init() -> None
Source code in rxxxt/component.py
136
async def on_init(self) -> None: ...

render abstractmethod

render() -> Element | Awaitable[Element]
Source code in rxxxt/component.py
85
86
@abstractmethod
def render(self) -> Element | Awaitable[Element]: ...

tonode

tonode(context: Context) -> Node
Source code in rxxxt/component.py
142
def tonode(self, context: Context) -> 'Node': return ComponentNode(context, self)

event_handler

event_handler(**kwargs)
Source code in rxxxt/component.py
65
66
67
68
def event_handler(**kwargs):
  options = ContextInputEventHandlerOptions.model_validate(kwargs)
  def _inner(fn) -> ClassEventHandler: return ClassEventHandler(fn, options)
  return _inner

HandleNavigate

HandleNavigate(location: str)

Bases: CustomAttribute

Methods:

Name Description
get_key_value

Attributes:

Name Type Description
location
Source code in rxxxt/component.py
71
72
73
def __init__(self, location: str) -> None:
  super().__init__()
  self.location = location

location instance-attribute

location = location

get_key_value

get_key_value(original_key: str) -> tuple[str, str]
Source code in rxxxt/component.py
75
76
def get_key_value(self, original_key: str) -> tuple[str, str]:
  return (original_key, f"window.rxxxt.navigate('{self.location}');")

CustomAttribute

Bases: ABC

Methods:

Name Description
get_key_value

get_key_value abstractmethod

get_key_value(original_key: str) -> tuple[str, str | None]
Source code in rxxxt/elements.py
 9
10
@abstractmethod
def get_key_value(self, original_key: str) -> tuple[str, str | None]: ...

El

Element

Bases: ABC

Methods:

Name Description
tonode

tonode abstractmethod

tonode(context: Context) -> Node
Source code in rxxxt/elements.py
13
14
@abstractmethod
def tonode(self, context: Context) -> Node: ...

ElementFactory

Bases: Protocol

Methods:

Name Description
__call__

__call__

__call__() -> Element
Source code in rxxxt/elements.py
105
def __call__(self) -> Element: ...

HTMLElement

HTMLElement(tag: str, attributes: dict[str, HTMLAttributeValue], content: ElementContent)

Bases: HTMLVoidElement

Methods:

Name Description
tonode
Source code in rxxxt/elements.py
53
54
55
def __init__(self, tag: str, attributes: dict[str, HTMLAttributeValue], content: ElementContent) -> None:
  super().__init__(tag, attributes)
  self._content = content

tonode

tonode(context: Context) -> Node
Source code in rxxxt/elements.py
57
58
def tonode(self, context: Context) -> 'Node':
  return ElementNode(context, self._tag, self._attributes, element_content_to_nodes(context, self._content))

HTMLFragment

HTMLFragment(content: ElementContent)

Bases: Element

Methods:

Name Description
tonode
Source code in rxxxt/elements.py
29
30
31
def __init__(self, content: ElementContent) -> None:
  super().__init__()
  self._content = content

tonode

tonode(context: Context) -> Node
Source code in rxxxt/elements.py
33
34
def tonode(self, context: Context) -> Node:
  return FragmentNode(context, element_content_to_nodes(context, self._content))

HTMLVoidElement

HTMLVoidElement(tag: str, attributes: dict[str, HTMLAttributeValue])

Bases: Element

Methods:

Name Description
tonode
Source code in rxxxt/elements.py
37
38
39
40
41
42
43
44
45
46
47
def __init__(self, tag: str, attributes: dict[str, HTMLAttributeValue]) -> None:
  super().__init__()
  self._tag = tag
  self._attributes: dict[str, str | None] = {}
  for k, v in attributes.items():
    if isinstance(v, CustomAttribute): k, v = v.get_key_value(k)
    elif isinstance(v, bool):
      if not v: continue
      v = None
    elif isinstance(v, (int, float)): v = str(v)
    self._attributes[k] = v

tonode

tonode(context: Context) -> Node
Source code in rxxxt/elements.py
49
50
def tonode(self, context: Context) -> 'Node':
  return VoidElementNode(context, self._tag, self._attributes)

UnescapedHTMLElement

UnescapedHTMLElement(text: str)

Bases: Element

Methods:

Name Description
tonode
Source code in rxxxt/elements.py
72
73
74
def __init__(self, text: str) -> None:
  super().__init__()
  self._text = text

tonode

tonode(context: Context) -> Node
Source code in rxxxt/elements.py
76
def tonode(self, context: Context) -> 'Node': return TextNode(context, self._text)

VEl

ContextInputEventHandlerOptions

Bases: BaseModel

Attributes:

Name Type Description
debounce int | None
prevent_default bool
throttle int | None

debounce class-attribute instance-attribute

debounce: int | None = None

prevent_default class-attribute instance-attribute

prevent_default: bool = False

throttle class-attribute instance-attribute

throttle: int | None = None

Context

Context(state: State, config: ContextConfig, stack: ContextStack)

Methods:

Name Description
add_query_selector_event
add_window_event
delete_cookie
get_header
navigate
remove_query_selector_event
remove_window_event
replace_index
request_update
set_cookie
sub
subscribe
unsubscribe
unsubscribe_all
use_websocket

Attributes:

Name Type Description
config
cookies dict[str, str]
id
location
path
query_string
sid
stack_sids
state
Source code in rxxxt/execution.py
148
149
150
151
def __init__(self, state: State, config: ContextConfig, stack: ContextStack) -> None:
  self._stack: ContextStack = stack
  self._config = config
  self.state = state

config property

config

cookies property

cookies: dict[str, str]

id property

id

location property

location

path property

path

query_string property

query_string

sid cached property

sid

stack_sids property

stack_sids

state instance-attribute

state = state

add_query_selector_event

add_query_selector_event(selector: str, name: str, descriptor: ContextInputEventDescriptor | ContextInputEventDescriptorGenerator, all: bool = False)
Source code in rxxxt/execution.py
210
211
def add_query_selector_event(self, selector: str, name: str, descriptor: ContextInputEventDescriptor | ContextInputEventDescriptorGenerator, all: bool = False):
  self._modify_query_selector_event(selector, name, descriptor, all, "add")

add_window_event

add_window_event(name: str, descriptor: ContextInputEventDescriptor | ContextInputEventDescriptorGenerator)
Source code in rxxxt/execution.py
208
209
def add_window_event(self, name: str, descriptor: ContextInputEventDescriptor | ContextInputEventDescriptorGenerator):
  self._modify_window_event(name, descriptor, "add")
delete_cookie(name: str)
Source code in rxxxt/execution.py
225
226
def delete_cookie(self, name: str):
  self.state.add_output_event(SetCookieOutputEvent(name=name, max_age=-1))

get_header

get_header(name: str) -> list[str]
Source code in rxxxt/execution.py
198
199
200
201
def get_header(self, name: str) -> list[str]:
  header_lines = self._get_state_str_subscribe(f"!header;{name}")
  if header_lines is None: return []
  else: return header_lines.splitlines()

navigate

navigate(location: str)
Source code in rxxxt/execution.py
218
219
220
def navigate(self, location: str):
  self.state.update_state_strs({"!location": location})
  self.state.add_output_event(NavigateOutputEvent(location=location))

remove_query_selector_event

remove_query_selector_event(selector: str, name: str, descriptor: ContextInputEventDescriptor | ContextInputEventDescriptorGenerator, all: bool = False)
Source code in rxxxt/execution.py
215
216
def remove_query_selector_event(self, selector: str, name: str, descriptor: ContextInputEventDescriptor | ContextInputEventDescriptorGenerator, all: bool = False):
  self._modify_query_selector_event(selector, name, descriptor, all, "remove")

remove_window_event

remove_window_event(name: str, descriptor: ContextInputEventDescriptor | ContextInputEventDescriptorGenerator)
Source code in rxxxt/execution.py
213
214
def remove_window_event(self, name: str, descriptor: ContextInputEventDescriptor | ContextInputEventDescriptorGenerator):
  self._modify_window_event(name, descriptor, "remove")

replace_index

replace_index(key: str)
Source code in rxxxt/execution.py
194
195
196
def replace_index(self, key: str):
  if isinstance(self._stack[-1], int): return Context(self.state, self._config, self._stack[:-1] + (key,))
  raise ValueError("No index to replace!")

request_update

request_update()
Source code in rxxxt/execution.py
203
def request_update(self): self.state.request_context_updates({ self.id })
set_cookie(name: str, value: str, expires: datetime | None = None, path: str | None = None, secure: bool | None = None, http_only: bool | None = None, domain: str | None = None, max_age: int | None = None)
Source code in rxxxt/execution.py
222
223
224
def set_cookie(self, name: str, value: str, expires: datetime | None = None, path: str | None = None,
              secure: bool | None = None, http_only: bool | None = None, domain: str | None = None, max_age: int | None = None):
  self.state.add_output_event(SetCookieOutputEvent(name=name, value=value, expires=expires, path=path, secure=secure, http_only=http_only, domain=domain, max_age=max_age))

sub

sub(key: ContextStackKey)
Source code in rxxxt/execution.py
193
def sub(self, key: ContextStackKey): return Context(self.state, self._config, self._stack + (key,))

subscribe

subscribe(key: str)
Source code in rxxxt/execution.py
204
def subscribe(self, key: str): self.state.subscribe(self.id, key)

unsubscribe

unsubscribe(key: str)
Source code in rxxxt/execution.py
205
def unsubscribe(self, key: str): self.state.unsubscribe(self.id, key)

unsubscribe_all

unsubscribe_all()
Source code in rxxxt/execution.py
206
def unsubscribe_all(self): self.state.unsubscribe_all(self.id)

use_websocket

use_websocket(websocket: bool = True)
Source code in rxxxt/execution.py
221
def use_websocket(self, websocket: bool = True): self.state.add_output_event(UseWebsocketOutputEvent(websocket=websocket))

State

State(update_event: Event)

State keys may have prefixes. These prefixes inidcate how and when and if a key should be removed from the state. Prefixes: "#" = temporary - removed from the user data and session state, if no longer used "!" = protocol - removed from user state, if not used but not purged from the session

Methods:

Name Description
add_output_event
cleanup
delete_key
destroy
get_key_cell
get_key_str
init
pop_output_events
pop_updates
request_context_updates
request_key_updates
set_key_cell
subscribe
unsubscribe
unsubscribe_all
update_state_strs

Attributes:

Name Type Description
keys set[str]
user_data
Source code in rxxxt/execution.py
25
26
27
28
29
30
31
32
def __init__(self, update_event: asyncio.Event) -> None:
  self._key_str_store: dict[str, str] = {}
  self._key_cell_store: dict[str, StateCell] = {}

  self._key_subscribers: dict[str, set[ContextStack]] = {}
  self._pending_updates: set[ContextStack] = set()
  self._output_events: list[OutputEvent] = []
  self._update_event = update_event

keys property

keys: set[str]

user_data property

user_data

add_output_event

add_output_event(event: OutputEvent)
Source code in rxxxt/execution.py
93
94
95
def add_output_event(self, event: OutputEvent):
  self._output_events.append(event)
  self._set_update_event()

cleanup

cleanup()
Source code in rxxxt/execution.py
113
114
115
116
117
118
119
120
121
def cleanup(self):
  active_keys = self._get_active_keys({ "#" })
  self._key_str_store = { k: v for k, v in self._key_str_store.items() if k in active_keys }
  inactive_cells = set(self._key_cell_store.keys())
  inactive_cells.difference_update(active_keys)
  for k in inactive_cells:
    if (cell := self._key_cell_store.pop(k, None)) is not None:
      cell.destroy()
  self._key_subscribers = { k: v for k, v in self._key_subscribers.items() if len(v) > 0 }

delete_key

delete_key(key: str)
Source code in rxxxt/execution.py
56
57
58
59
def delete_key(self, key: str):
  self._key_str_store.pop(key, None)
  if (cell := self._key_cell_store.pop(key, None)) is not None:
    cell.destroy()

destroy

destroy()
Source code in rxxxt/execution.py
108
109
110
111
def destroy(self):
  for cell in self._key_cell_store.values():
    cell.destroy()
  self._key_cell_store.clear()

get_key_cell

get_key_cell(key: str)
Source code in rxxxt/execution.py
51
def get_key_cell(self, key: str): return self._key_cell_store.get(key)

get_key_str

get_key_str(key: str)
Source code in rxxxt/execution.py
46
47
48
49
50
def get_key_str(self, key: str):
  if (v := self._key_str_store.get(key)) is None:
    cell = self._key_cell_store.get(key)
    if cell is not None: v = cell.serlialize()
  return v

init

init(k_str_store: dict[str, str])
Source code in rxxxt/execution.py
44
def init(self, k_str_store: dict[str, str]): self._key_str_store.update(k_str_store)

pop_output_events

pop_output_events()
Source code in rxxxt/execution.py
 97
 98
 99
100
def pop_output_events(self):
  res = self._output_events
  self._output_events = []
  return res

pop_updates

pop_updates()
Source code in rxxxt/execution.py
102
103
104
105
106
def pop_updates(self):
  res = self._pending_updates
  self._pending_updates = set()
  self._set_update_event()
  return res

request_context_updates

request_context_updates(ids: set[ContextStack])
Source code in rxxxt/execution.py
72
73
74
def request_context_updates(self, ids: set[ContextStack]):
  for id in ids: self._pending_updates.add(id)
  self._set_update_event()

request_key_updates

request_key_updates(keys: set[str])
Source code in rxxxt/execution.py
76
77
78
def request_key_updates(self, keys: set[str]):
  ids: set[ContextStack] = set().union(*(self._key_subscribers.get(key, ()) for key in keys))
  self.request_context_updates(ids)

set_key_cell

set_key_cell(key: str, cell: StateCell, overwrite: bool = False)
Source code in rxxxt/execution.py
52
53
54
55
def set_key_cell(self, key: str, cell: StateCell, overwrite: bool = False):
  if key in self._key_cell_store and not overwrite:
    raise ValueError(f"Cell already registered with key '{key}'!")
  self._key_cell_store[key] = cell

subscribe

subscribe(cid: ContextStack, key: str)
Source code in rxxxt/execution.py
80
81
def subscribe(self, cid: ContextStack, key: str):
  self._key_subscribers.setdefault(key, set()).add(cid)

unsubscribe

unsubscribe(cid: ContextStack, key: str)
Source code in rxxxt/execution.py
83
84
85
def unsubscribe(self, cid: ContextStack, key: str):
  subs = self._key_subscribers.get(key)
  if subs is not None and cid in subs: subs.remove(cid)

unsubscribe_all

unsubscribe_all(cid: ContextStack)
Source code in rxxxt/execution.py
87
88
89
90
91
def unsubscribe_all(self, cid: ContextStack):
  if cid in self._pending_updates: self._pending_updates.remove(cid)
  for ids in self._key_subscribers.values():
    if id in ids: ids.remove(id)
  self._set_update_event()

update_state_strs

update_state_strs(data: dict[str, str])
Source code in rxxxt/execution.py
61
62
63
64
65
66
67
68
69
70
def update_state_strs(self, data: dict[str, str]):
  for k, v in data.items():
    cell = self.get_key_cell(k)
    if cell is None:
      self.set_key_cell(k, StrStateCell(v))
    elif isinstance(cell, StrStateCell):
      cell.value = v
    else:
      raise ValueError("invalid cell for location!")
  self.request_key_updates(set(data.keys()))

default_page

default_page(header: Element, content: Element, body_end: Element)
Source code in rxxxt/page.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
def default_page(header: Element, content: Element, body_end: Element):
  return HTMLFragment([
    VEl["!DOCTYPE"](html=None),
    El.html(content=[
      El.head(content=[
        VEl.meta(charset="UTF-8"),
        VEl.meta(name="viewport", content="width=device-width, initial-scale=1.0"),
        header
      ]),
      El.body(content=[
        content,
        body_end
      ])
    ])
  ])

PageBuilder

PageBuilder(page_factory: PageFactory = default_page)

Bases: PageFactory

Methods:

Name Description
__call__
add_body_end
add_body_script
add_header
add_header_script
add_stylesheet
Source code in rxxxt/page.py
24
25
26
27
def __init__(self, page_factory: PageFactory = default_page) -> None:
  self._header_elements: list[Element] = []
  self._body_end_elements: list[Element] = []
  self._page_factory = page_factory

__call__

__call__(header: Element, content: Element, body_end: Element) -> Element
Source code in rxxxt/page.py
38
39
def __call__(self, header: Element, content: Element, body_end: Element) -> Element:
  return self._page_factory(HTMLFragment([ header, *self._header_elements ]), content, HTMLFragment([ *self._body_end_elements, body_end ]))

add_body_end

add_body_end(el: Element)
Source code in rxxxt/page.py
36
def add_body_end(self, el: Element): self._body_end_elements.append(el)

add_body_script

add_body_script(url: str, content: list[Element | str] = [], **kwargs: str)
Source code in rxxxt/page.py
32
33
def add_body_script(self, url: str, content: list[Element | str] = [], **kwargs: str):
  self.add_body_end(El.script(src=url, content=content, **kwargs))

add_header

add_header(el: Element)
Source code in rxxxt/page.py
35
def add_header(self, el: Element): self._header_elements.append(el)

add_header_script

add_header_script(url: str, content: list[Element | str] = [], **kwargs: str)
Source code in rxxxt/page.py
30
31
def add_header_script(self, url: str, content: list[Element | str] = [], **kwargs: str):
  self.add_header(El.script(src=url, content=content, **kwargs))

add_stylesheet

add_stylesheet(url: str, **kwargs: str)
Source code in rxxxt/page.py
29
def add_stylesheet(self, url: str, **kwargs: str): self.add_header(VEl.link(rel="stylesheet", href=url, **kwargs))

Router

Router()

Bases: ElementFactory

Classes:

Name Description
RoutedComponent

Methods:

Name Description
__call__
add_route
route
Source code in rxxxt/router.py
93
94
def __init__(self) -> None:
  self._routes: list[tuple[PathPattern, ElementFactory]] = []

RoutedComponent

RoutedComponent(routes: list[tuple[PathPattern, ElementFactory]])

Bases: Component

Methods:

Name Description
on_before_update
render

Attributes:

Name Type Description
params
Source code in rxxxt/router.py
70
71
72
73
def __init__(self, routes: list[tuple[PathPattern, ElementFactory]]):
  super().__init__()
  self._routes = routes
  self._selected_match: tuple[ElementFactory, dict[str, str]] | None = None
params class-attribute instance-attribute
params = router_params()
on_before_update async
on_before_update() -> None
Source code in rxxxt/router.py
75
76
77
async def on_before_update(self) -> None:
  self._selected_match = self._get_current_match()
  self.params = dict() if self._selected_match is None else self._selected_match[1]
render
render() -> Element
Source code in rxxxt/router.py
79
80
81
82
83
def render(self) -> Element:
  if self._selected_match is None:
    return El.h1(content=["Not found!"])
  else:
    return self._selected_match[0]()

__call__

__call__() -> Element
Source code in rxxxt/router.py
103
def __call__(self) -> Element: return Router.RoutedComponent(self._routes)

add_route

add_route(path: str, element_factory: ElementFactory)
Source code in rxxxt/router.py
96
def add_route(self, path: str, element_factory: ElementFactory): self._routes.append((PathPattern(path), element_factory))

route

route(path: str)
Source code in rxxxt/router.py
 97
 98
 99
100
101
def route(self, path: str):
  def _inner(fn: ElementFactory):
    self.add_route(path, fn)
    return fn
  return _inner

router_params

router_params()
Source code in rxxxt/router.py
64
def router_params(): return context_state(dict[str, str], "*rp*")

context_state

context_state(default_factory: Callable[[], T], name: str | None = None)
Source code in rxxxt/state.py
128
129
def context_state(default_factory: Callable[[], T], name: str | None = None):
  return StateDescriptor(get_context_state_key, default_factory, state_name=name)

context_state_box

context_state_box(default_factory: Callable[[], T], name: str | None = None)
Source code in rxxxt/state.py
137
138
def context_state_box(default_factory: Callable[[], T], name: str | None = None):
  return StateBoxDescriptor(get_context_state_key, default_factory, state_name=name)

default_state_resolver

default_state_resolver() -> JWTStateResolver

Creates a JWTStateResolver. Uses the environment variable JWT_SECRET as its secret, if set, otherwise creates a new random, temporary secret.

Source code in rxxxt/state.py
206
207
208
209
210
211
212
213
214
215
def default_state_resolver() -> JWTStateResolver:
  """
  Creates a JWTStateResolver.
  Uses the environment variable `JWT_SECRET` as its secret, if set, otherwise creates a new random, temporary secret.
  """

  jwt_secret = os.getenv("JWT_SECRET", None)
  if jwt_secret is None: jwt_secret = secrets.token_bytes(64)
  else: jwt_secret = jwt_secret.encode("utf-8")
  return JWTStateResolver(jwt_secret)

global_state

global_state(default_factory: Callable[[], T], name: str | None = None)
Source code in rxxxt/state.py
125
126
def global_state(default_factory: Callable[[], T], name: str | None = None):
  return StateDescriptor(get_global_state_key, default_factory, state_name=name)

global_state_box

global_state_box(default_factory: Callable[[], T], name: str | None = None)
Source code in rxxxt/state.py
134
135
def global_state_box(default_factory: Callable[[], T], name: str | None = None):
  return StateBoxDescriptor(get_global_state_key, default_factory, state_name=name)

JWTStateResolver

JWTStateResolver(secret: bytes, max_age: timedelta | None = None, algorithm: Literal['HS256'] | Literal['HS384'] | Literal['HS512'] = 'HS512')

Bases: StateResolver

Methods:

Name Description
create_token
resolve
Source code in rxxxt/state.py
149
150
151
152
153
154
def __init__(self, secret: bytes, max_age: timedelta | None = None, algorithm: Literal["HS256"] | Literal["HS384"] | Literal["HS512"] = "HS512") -> None:
  super().__init__()
  self._secret = secret
  self._algorithm = algorithm
  self._digest = { "HS256": hashlib.sha256, "HS384": hashlib.sha384, "HS512": hashlib.sha512 }[algorithm]
  self._max_age: timedelta = timedelta(days=1) if max_age is None else max_age

create_token

create_token(data: dict[str, str], old_token: str | None) -> str
Source code in rxxxt/state.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def create_token(self, data: dict[str, str], old_token: str | None) -> str:
  payload = { "exp": int((datetime.now(tz=timezone.utc) + self._max_age).timestamp()), "data": data }
  stream = BytesIO()
  stream.write(JWTStateResolver._b64url_encode(json.dumps({
    "typ": "JWT",
    "alg": self._algorithm
  }).encode("utf-8")))
  stream.write(b".")
  stream.write(JWTStateResolver._b64url_encode(json.dumps(payload).encode("utf-8")))

  signature = hmac.digest(self._secret, stream.getvalue(), self._digest)
  stream.write(b".")
  stream.write(JWTStateResolver._b64url_encode(signature))
  return stream.getvalue().decode("utf-8")

resolve

resolve(token: str) -> dict[str, str] | Awaitable[dict[str, str]]
Source code in rxxxt/state.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
def resolve(self, token: str) -> dict[str, str] | Awaitable[dict[str, str]]:
  rtoken = token.encode("utf-8")
  sig_start = rtoken.rfind(b".")
  if sig_start == -1: raise StateResolverError("Invalid token format")
  parts = rtoken.split(b".")
  if len(parts) != 3: raise StateResolverError("Invalid token format")

  try: header = json.loads(JWTStateResolver._b64url_decode(parts[0]))
  except: raise StateResolverError("Invalid token header")

  if not isinstance(header, dict) or header.get("typ") != "JWT" or header.get("alg") != self._algorithm:
    raise StateResolverError("Invalid header contents")

  signature = JWTStateResolver._b64url_decode(rtoken[(sig_start + 1):])
  actual_signature = hmac.digest(self._secret, rtoken[:sig_start], self._digest)
  if not hmac.compare_digest(signature, actual_signature):
    raise StateResolverError("Invalid JWT signature!")

  payload = json.loads(JWTStateResolver._b64url_decode(parts[1]))
  if not isinstance(payload, dict) or not isinstance(payload.get("exp"), int) or not isinstance(payload.get("data"), dict):
    raise StateResolverError("Invalid JWT payload!")

  expires_dt = datetime.fromtimestamp(payload["exp"], timezone.utc)
  if expires_dt < datetime.now(tz=timezone.utc):
    raise StateResolverError("JWT expired!")

  try: state_data = StateDataAdapter.validate_python(payload["data"])
  except ValidationError as e: raise StateResolverError(e)
  return state_data

local_state

local_state(default_factory: Callable[[], T], name: str | None = None)
Source code in rxxxt/state.py
122
123
def local_state(default_factory: Callable[[], T], name: str | None = None):
  return StateDescriptor(get_local_state_key, default_factory, state_name=name)

local_state_box

local_state_box(default_factory: Callable[[], T], name: str | None = None)
Source code in rxxxt/state.py
131
132
def local_state_box(default_factory: Callable[[], T], name: str | None = None):
  return StateBoxDescriptor(get_local_state_key, default_factory, state_name=name)

StateResolver

Bases: ABC

Methods:

Name Description
create_token
resolve

create_token abstractmethod

create_token(data: dict[str, str], old_token: str | None) -> str | Awaitable[str]
Source code in rxxxt/state.py
143
144
@abstractmethod
def create_token(self, data: dict[str, str], old_token: str | None) -> str | Awaitable[str]: pass

resolve abstractmethod

resolve(token: str) -> dict[str, str] | Awaitable[dict[str, str]]
Source code in rxxxt/state.py
145
146
@abstractmethod
def resolve(self, token: str) -> dict[str, str] | Awaitable[dict[str, str]]: pass

class_map

class_map(map: dict[str, bool])
Source code in rxxxt/utils.py
1
2
def class_map(map: dict[str, bool]):
  return " ".join([ k for k, v in map.items() if v ])