Skip to content

API

App

App(content: ElementFactory, state_resolver: StateResolver | None = None, page_factory: PageFactory = default_page, config: AppConfig | None = None)

Methods:

Name Description
__call__
Source code in rxxxt/app.py
26
27
28
29
30
31
32
33
34
35
36
37
def __init__(self, content: ElementFactory, state_resolver: StateResolver | None = None, page_factory: PageFactory = default_page, \
    config: AppConfig | None = None) -> None:
  self._content = content
  self._page_factory: PageFactory = page_factory
  self._state_resolver = state_resolver or default_state_resolver()
  self._composer = Composer()
  self._config = config or AppConfig()
  _ = self._composer.add_handler(http_handler(routed_handler("/rxxxt-client.js")(self._http_static_rxxxt_client_js)))
  _ = self._composer.add_handler(http_handler(self._http_post_session))
  _ = self._composer.add_handler(http_handler(self._http_get_session))
  _ = self._composer.add_handler(websocket_handler(self._ws_session))
  _ = self._composer.add_handler(http_not_found_handler)

__call__ async

__call__(scope: ASGIScope, receive: ASGIFnReceive, send: ASGIFnSend) -> Any
Source code in rxxxt/app.py
39
40
async def __call__(self, scope: ASGIScope, receive: ASGIFnReceive, send: ASGIFnSend) -> Any:
  return await self._composer(scope, receive, send)

ASGIFnReceive module-attribute

ASGIFnReceive = Callable[[], Awaitable[MutableMapping[str, Any]]]

ASGIFnSend module-attribute

ASGIFnSend = Callable[[MutableMapping[str, Any]], Awaitable[Any]]

ASGIHandler module-attribute

ASGIHandler = Callable[[ASGIScope, ASGIFnReceive, ASGIFnSend], Awaitable[Any]]

ASGINextException

Bases: Exception

ASGIScope module-attribute

ASGIScope = MutableMapping[str, Any]

Composer

Composer()

Methods:

Name Description
__call__
add_handler
Source code in rxxxt/asgi.py
211
212
def __init__(self) -> None:
  self._handlers: list[ASGIHandler] = []

__call__ async

__call__(scope: ASGIScope, receive: ASGIFnReceive, send: ASGIFnSend) -> Any
Source code in rxxxt/asgi.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
async def __call__(self, scope: ASGIScope, receive: ASGIFnReceive, send: ASGIFnSend) -> Any:
  try:
    for handler in self._handlers:
      try: return await handler(scope, receive, send)
      except ASGINextException: pass
  except asyncio.CancelledError: raise
  except BaseException as e:
    Composer._error_count += 1
    if Composer._error_count == 1:
      print("ERROR during an ASGI request. Enable logging by calling `import logging; logging.basicConfig(level=logging.DEBUG)` at startup.", file=sys.stderr)
    logging.debug("Unhandled exception during a request", exc_info=True, stack_info=True)
    if scope["type"] == "websocket":
      return await self._ws_error_handler(WebsocketContext(scope, receive, send), e)
    if scope["type"] == "http":
      return await self._http_error_handler(HTTPContext(scope, receive, send), e)

add_handler

add_handler(handler: ASGIHandler)
Source code in rxxxt/asgi.py
214
215
216
def add_handler(self, handler: ASGIHandler):
  self._handlers.append(handler)
  return handler

http_handler

http_handler(fn: Callable[[HTTPContext], Awaitable[Any]])
Source code in rxxxt/asgi.py
183
184
185
186
187
def http_handler(fn: Callable[[HTTPContext], Awaitable[Any]]):
  async def _inner(scope: ASGIScope, receive: ASGIFnReceive, send: ASGIFnSend) -> Any:
    if scope["type"] != "http": raise ASGINextException()
    return await fn(HTTPContext(scope, receive, send))
  return _inner

http_not_found_handler async

http_not_found_handler(context: HTTPContext)
Source code in rxxxt/asgi.py
204
205
206
@http_handler
async def http_not_found_handler(context: HTTPContext):
  await context.respond_text("not found", 404)

HTTPContext

HTTPContext(scope: ASGIScope, receive: ASGIFnReceive, send: ASGIFnSend)

Bases: TransportContext

Methods:

Name Description
add_response_headers
receive_bytes
receive_iter
receive_json
receive_json_raw
receive_text
respond_file
respond_text
response_body
response_start

Attributes:

Name Type Description
method
Source code in rxxxt/asgi.py
101
102
103
def __init__(self, scope: ASGIScope, receive: ASGIFnReceive, send: ASGIFnSend) -> None:
  super().__init__(scope, receive, send)
  self._response_headers: list[tuple[BytesLike, BytesLike]] = []

method property

method

add_response_headers

add_response_headers(headers: ASGIHeaders)
Source code in rxxxt/asgi.py
108
def add_response_headers(self, headers: ASGIHeaders): self._response_headers.extend(headers)

receive_bytes async

receive_bytes() -> bytes
Source code in rxxxt/asgi.py
168
169
170
171
172
async def receive_bytes(self) -> bytes:
  stream = io.BytesIO()
  async for chunk in self.receive_iter():
    _ = stream.write(chunk)
  return stream.getvalue()

receive_iter async

receive_iter() -> AsyncGenerator[bytes, Any]
Source code in rxxxt/asgi.py
174
175
176
177
178
179
180
181
async def receive_iter(self) -> AsyncGenerator[bytes, Any]:
  while True:
    event = await self.receive()
    event_type = event.get("type")
    if event_type == "http.request":
      yield event.get("body", b"")
      if not event.get("more_body", False): return
    elif event_type == "http.disconnect": return

receive_json async

receive_json()
Source code in rxxxt/asgi.py
154
async def receive_json(self): return json.loads(await self.receive_json_raw())

receive_json_raw async

receive_json_raw()
Source code in rxxxt/asgi.py
156
async def receive_json_raw(self): return await self.receive_text({ "application/json" })

receive_text async

receive_text(allowed_mime_types: Iterable[str])
Source code in rxxxt/asgi.py
158
159
160
161
162
163
164
165
166
async def receive_text(self, allowed_mime_types: Iterable[str]):
  allowed_mime_types = allowed_mime_types if isinstance(allowed_mime_types, set) else set(allowed_mime_types)
  mime_type, ct_params = self.content_type
  if mime_type not in allowed_mime_types: raise ValueError(f"Mime type '{mime_type}' is not in allowed types!")
  charset = ct_params.get("charset", "utf-8")
  try: decoder = codecs.getdecoder(charset)
  except LookupError: raise ValueError("Invalid content-type encoding!")
  data = await self.receive_bytes()
  return decoder(data, "ignore")[0]

respond_file async

respond_file(path: str | Path, mime_type: str | None = None, handle_404: bool = False, use_last_modified: bool = False)
Source code in rxxxt/asgi.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
async def respond_file(self, path: str | pathlib.Path, mime_type: str | None = None, handle_404: bool = False, use_last_modified: bool = False):
  mime_type = mime_type or mimetypes.guess_type(path)[0]
  if mime_type is None: raise ValueError("Unknown mime type!")
  ppath = pathlib.Path(path)
  if handle_404 and not ppath.exists():
    return await self.respond_text("not found", 404)

  with open(ppath, "rb") as fd:
    fd_stat = os.stat(fd.fileno())

    if use_last_modified:
      last_modified = formatdate(fd_stat.st_mtime, usegmt=True).encode()
      self.add_response_headers([ (b"Last-Modified", last_modified) ])
      if (last_modified,) == self.headers.get("If-Modified-Since", None):
        await self.response_start(304)
        await self.response_body(b"", False)
        return

    self.add_response_headers(content_headers(fd_stat.st_size, mime_type))
    await self.response_start(200)
    while len(data := fd.read(1_000_000)) != 0:
      await self.response_body(data, fd.tell() != fd_stat.st_size)

respond_text async

respond_text(text: str, status: int = 200, mime_type: str = 'text/plain')
Source code in rxxxt/asgi.py
125
126
127
128
129
async def respond_text(self, text: str, status: int = 200, mime_type: str = "text/plain"):
  data = text.encode("utf-8")
  self.add_response_headers(content_headers(len(data), mime_type + "; charset=utf-8"))
  await self.response_start(status)
  await self.response_body(data, False)

response_body async

response_body(data: BytesLike, more_body: bool)
Source code in rxxxt/asgi.py
118
119
120
121
122
123
async def response_body(self, data: BytesLike, more_body: bool):
  await self.send({
    "type": "http.response.body",
    "body": data,
    "more_body": more_body
  })

response_start async

response_start(status: int, trailers: bool = False)
Source code in rxxxt/asgi.py
110
111
112
113
114
115
116
async def response_start(self, status: int, trailers: bool = False):
  await self.send({
    "type": "http.response.start",
    "status": status,
    "headers": self._response_headers,
    "trailers": trailers
  })

routed_handler

routed_handler(pattern: str)
Source code in rxxxt/asgi.py
196
197
198
199
200
201
202
def routed_handler(pattern: str):
  def _inner(fn: Callable[[CTXT, dict[str, str]], Awaitable[Any]]) -> Callable[[CTXT], Awaitable[Any]]:
    async def _inner_inner(context: CTXT) -> Any:
      if (match:=match_path(pattern, context.path)) is None: context.next()
      return await fn(context, match)
    return _inner_inner
  return _inner

TransportContext

TransportContext(scope: ASGIScope, receive: ASGIFnReceive, send: ASGIFnSend)

Methods:

Name Description
next

Attributes:

Name Type Description
content_type
fullpath
headers
location
path
query_string str | None
receive
scope
send
Source code in rxxxt/asgi.py
19
20
21
22
def __init__(self, scope: ASGIScope, receive: ASGIFnReceive, send: ASGIFnSend) -> None:
  self.scope = scope
  self.receive = receive
  self.send = send

content_type cached property

content_type

fullpath property

fullpath

headers cached property

headers

location property

location

path property

path

query_string property

query_string: str | None

receive instance-attribute

receive = receive

scope instance-attribute

scope = scope

send instance-attribute

send = send

next

next()
Source code in rxxxt/asgi.py
58
def next(self): raise ASGINextException()

websocket_handler

websocket_handler(fn: Callable[[WebsocketContext], Awaitable[Any]])
Source code in rxxxt/asgi.py
189
190
191
192
193
def websocket_handler(fn: Callable[[WebsocketContext], Awaitable[Any]]):
  async def _inner(scope: ASGIScope, receive: ASGIFnReceive, send: ASGIFnSend) -> Any:
    if scope["type"] != "websocket": raise ASGINextException()
    return await fn(WebsocketContext(scope, receive, send))
  return _inner

WebsocketContext

WebsocketContext(scope: ASGIScope, receive: ASGIFnReceive, send: ASGIFnSend)

Bases: TransportContext

Methods:

Name Description
close
receive_message
send_message
setup

Attributes:

Name Type Description
connected
Source code in rxxxt/asgi.py
61
62
63
def __init__(self, scope: ASGIScope, receive: ASGIFnReceive, send: ASGIFnSend) -> None:
  super().__init__(scope, receive, send)
  self._connected = True

connected property

connected

close async

close(code: int = 1000, reason: str = 'Normal Closure')
Source code in rxxxt/asgi.py
90
91
92
async def close(self, code: int = 1000, reason: str = "Normal Closure"):
  await self.send({ "type": "websocket.close", "code": code, "reason": reason })
  self._connected = False

receive_message async

receive_message() -> BytesLike | str
Source code in rxxxt/asgi.py
73
74
75
76
77
78
79
80
81
async def receive_message(self) -> BytesLike | str:
  while self._connected:
    event = await self.receive()
    if event["type"] == "websocket.disconnect":
      self._connected = False
      raise ConnectionError("Connection closed!")
    elif event["type"] == "websocket.receive":
      return event.get("bytes", event.get("text"))
  raise ConnectionError("Connection closed!")

send_message async

send_message(data: str | BytesLike)
Source code in rxxxt/asgi.py
83
84
85
86
87
88
async def send_message(self, data: str | BytesLike):
  if not self._connected: raise ConnectionError("Not connected!")
  event: dict[str, Any] = { "type": "websocket.send", "bytes": None, "text": None }
  if isinstance(data, str): event["text"] = data
  else: event["bytes"] = data
  await self.send(event)

setup async

setup(headers: ASGIHeaders = (), subprotocol: str | None = None)
Source code in rxxxt/asgi.py
68
69
70
71
async def setup(self, headers: ASGIHeaders = (), subprotocol: str | None = None):
  event = await self.receive()
  if event["type"] != "websocket.connect": raise ConnectionError("Did not receive connect event!")
  await self.send({ "type": "websocket.accept", "subprotocol": subprotocol, "headers": [ (name.lower(), value) for name, value in headers ] })

Component

Component()

Bases: Element

Methods:

Name Description
add_job

Runs a background job until completion. Only runs when the session is persistent.

add_worker

Runs a background worker, which may be cancelled at any time. Only runs when the session is persistent.

lc_destroy
lc_init
lc_render
on_after_destroy
on_after_update
on_before_destroy
on_before_update
on_init
render
tonode

Attributes:

Name Type Description
context Context
Source code in rxxxt/component.py
237
238
239
240
241
def __init__(self) -> None:
  super().__init__()
  self.context: Context
  self._worker_tasks: list[asyncio.Task[Any]] = []
  self._job_tasks: list[asyncio.Task[Any]] = []

context instance-attribute

context: Context

add_job

add_job(a: Coroutine[Any, Any, Any])

Runs a background job until completion. Only runs when the session is persistent. args: a: Coroutine - the coroutine that should be run

Source code in rxxxt/component.py
243
244
245
246
247
248
249
250
251
def add_job(self, a: Coroutine[Any, Any, Any]):
  """
  Runs a background job until completion. Only runs when the session is persistent.
  args:
    a: Coroutine - the coroutine that should be run
  """
  if self.context.config.persistent:
    self._worker_tasks.append(asyncio.create_task(a))
  else: a.close()

add_worker

add_worker(a: Coroutine[Any, Any, Any])

Runs a background worker, which may be cancelled at any time. Only runs when the session is persistent. args: a: Coroutine - the coroutine that should be run

Source code in rxxxt/component.py
252
253
254
255
256
257
258
259
260
def add_worker(self, a: Coroutine[Any, Any, Any]):
  """
  Runs a background worker, which may be cancelled at any time. Only runs when the session is persistent.
  args:
    a: Coroutine - the coroutine that should be run
  """
  if self.context.config.persistent:
    self._worker_tasks.append(asyncio.create_task(a))
  else: a.close()

lc_destroy async

lc_destroy() -> None
Source code in rxxxt/component.py
275
276
277
278
279
280
281
282
283
284
285
286
async def lc_destroy(self) -> None:
  await to_awaitable(self.on_before_destroy)
  if len(self._job_tasks) > 0:
    try: _ = await asyncio.wait(self._job_tasks)
    except asyncio.CancelledError: pass
    self._job_tasks.clear()
  if len(self._worker_tasks) > 0:
    for t in self._worker_tasks: _ = t.cancel()
    try: _ = await asyncio.wait(self._worker_tasks)
    except asyncio.CancelledError: pass
    self._worker_tasks.clear()
  await to_awaitable(self.on_after_destroy)

lc_init async

lc_init(context: Context) -> None
Source code in rxxxt/component.py
262
263
264
265
266
async def lc_init(self, context: Context) -> None:
  if hasattr(self, "context"):
    raise asyncio.InvalidStateError("Context already present, you must not use the instance of a component in twice.")
  self.context = context
  await to_awaitable(self.on_init)

lc_render async

lc_render() -> Element
Source code in rxxxt/component.py
268
269
270
271
272
273
274
async def lc_render(self) -> Element:
  await to_awaitable(self.on_before_update)
  el = await to_awaitable(self.render)
  try: self.context.execution.pending_updates.remove(self.context.id) # NOTE: remove any update that was requested during render
  except KeyError: pass
  await to_awaitable(self.on_after_update)
  return el

on_after_destroy

on_after_destroy() -> None | Awaitable[None]
Source code in rxxxt/component.py
292
def on_after_destroy(self) -> None | Awaitable[None]: ...

on_after_update

on_after_update() -> None | Awaitable[None]
Source code in rxxxt/component.py
290
def on_after_update(self) -> None | Awaitable[None]: ...

on_before_destroy

on_before_destroy() -> None | Awaitable[None]
Source code in rxxxt/component.py
291
def on_before_destroy(self) -> None | Awaitable[None]: ...

on_before_update

on_before_update() -> None | Awaitable[None]
Source code in rxxxt/component.py
289
def on_before_update(self) -> None | Awaitable[None]: ...

on_init

on_init() -> None | Awaitable[None]
Source code in rxxxt/component.py
288
def on_init(self) -> None | Awaitable[None]: ...

render abstractmethod

render() -> Element | Awaitable[Element]
Source code in rxxxt/component.py
294
295
@abstractmethod
def render(self) -> Element | Awaitable[Element]: ...

tonode

tonode(context: Context) -> Node
Source code in rxxxt/component.py
297
def tonode(self, context: Context) -> 'Node': return ComponentNode(context, self)

context_state module-attribute

context_state = partial(_field_state, get_context_state_key)

context_state_box module-attribute

context_state_box = partial(_box_state, get_context_state_key)

event_handler

event_handler(**kwargs: Any)
Source code in rxxxt/component.py
223
224
225
226
def event_handler(**kwargs: Any):
  options = InputEventDescriptorOptions.model_validate(kwargs)
  def _inner(fn: Callable[Concatenate[Any, FNP], FNR]) -> UnboundEventHandler[FNP, FNR]: return UnboundEventHandler(fn, options)
  return _inner

global_state module-attribute

global_state = partial(_field_state, get_global_state_key)

global_state_box module-attribute

global_state_box = partial(_box_state, get_global_state_key)

HandleNavigate

HandleNavigate(location: str)

Bases: CustomAttribute

Methods:

Name Description
tonode

Attributes:

Name Type Description
location
Source code in rxxxt/component.py
229
230
231
def __init__(self, location: str) -> None:
  super().__init__()
  self.location = location

location instance-attribute

location = location

tonode

tonode(context: Context, original_key: str) -> Node
Source code in rxxxt/component.py
233
234
def tonode(self, context: Context, original_key: str) -> Node:
  return TextNode(context, f"{html.escape(original_key)}=\"window.rxxxt.navigate('{html.escape(self.location)}');\"")

local_state module-attribute

local_state = partial(_field_state, get_local_state_key)

local_state_box module-attribute

local_state_box = partial(_box_state, get_local_state_key)

SharedExternalState

SharedExternalState(initial_value: T)

Bases: Generic[T]

Methods:

Name Description
__get__
update

Attributes:

Name Type Description
value T
Source code in rxxxt/component.py
155
156
157
def __init__(self, initial_value: T) -> None:
  self._components: weakref.WeakSet[Component] = weakref.WeakSet()
  self._value: T = initial_value

value property writable

value: T

__get__

__get__(obj: Any, objtype: Any = None)
Source code in rxxxt/component.py
168
169
170
171
172
173
174
def __get__(self, obj: Any, objtype: Any=None):
  if obj is None:
    return self
  if not isinstance(obj, Component):
    raise TypeError("SharedExternalState must only be accessed from inside a component!")
  self._components.add(obj)
  return self

update

update()
Source code in rxxxt/component.py
176
177
178
def update(self):
  for component in self._components:
    component.context.request_update()

StateBox

StateBox(key: str, state: State, default_factory: Callable[[], T], adapter: TypeAdapter[T])

Bases: Generic[T], StateCell

Methods:

Name Description
__enter__
__exit__
consume
detach
produce
update

Attributes:

Name Type Description
key
value
Source code in rxxxt/component.py
13
14
15
16
17
18
19
20
21
22
23
24
25
def __init__(self, key: str, state: State, default_factory: Callable[[], T], adapter: TypeAdapter[T]) -> None:
  super().__init__()
  self._key = key
  self._state = state
  self._adapter = adapter
  self._value: T

  key_state = state.get(key)
  key_state.add_consumer(self)
  try: self._value = adapter.validate_json(key_state.get())
  except ValueError:
    self._value = default_factory()
    key_state.set(self)

key property

key

value property writable

value

__enter__

__enter__()
Source code in rxxxt/component.py
27
def __enter__(self): return self.value

__exit__

__exit__(*_)
Source code in rxxxt/component.py
28
def __exit__(self, *_): self.update()

consume

consume(key: str, producer: Callable[[], str]) -> Any
Source code in rxxxt/component.py
47
48
def consume(self, key: str, producer: Callable[[], str]) -> Any:
  self._value = self._adapter.validate_json(producer())

detach

detach(key: str) -> Any
Source code in rxxxt/component.py
50
51
def detach(self, key: str) -> Any:
  del self._value

produce

produce(key: str) -> str
Source code in rxxxt/component.py
44
45
def produce(self, key: str) -> str:
  return self._adapter.dump_json(self._value).decode()

update

update()
Source code in rxxxt/component.py
41
42
def update(self):
  self._state.get(self._key).set(self)

add_attributes

add_attributes(base: HTMLAttributes, **kwargs: HTMLAttributeValue)
Source code in rxxxt/elements.py
166
167
def add_attributes(base: HTMLAttributes, **kwargs: HTMLAttributeValue):
  return merge_attributes(kwargs, base)

class_map

class_map(map: dict[str, bool])
Source code in rxxxt/elements.py
160
161
def class_map(map: dict[str, bool]):
  return " ".join([ k for k, v in map.items() if v ])

CustomAttribute

Bases: ABC

Methods:

Name Description
tonode

tonode abstractmethod

tonode(context: Context, original_key: str) -> Node
Source code in rxxxt/elements.py
16
17
@abstractmethod
def tonode(self, context: Context, original_key: str) -> Node: ...

El

Element

Bases: ABC

Methods:

Name Description
tonode

tonode abstractmethod

tonode(context: Context) -> Node
Source code in rxxxt/elements.py
12
13
@abstractmethod
def tonode(self, context: Context) -> Node: ...

ElementContent module-attribute

ElementContent = Iterable[Element | str]

ElementFactory

Bases: Protocol

Methods:

Name Description
__call__

__call__

__call__() -> Element
Source code in rxxxt/elements.py
149
def __call__(self) -> Element: ...

HTMLAttributes module-attribute

HTMLAttributes = dict[str, HTMLAttributeValue]

HTMLAttributeValue module-attribute

HTMLAttributeValue = str | bool | int | float | CustomAttribute | Callable | None

HTMLElement

HTMLElement(context: Context, tag: str, attributes: HTMLAttributes, content: ElementContent)
Source code in rxxxt/elements.py
84
85
86
87
@fn_element
def HTMLElement(context: Context, tag: str, attributes: HTMLAttributes, content: ElementContent):
  return ElementNode(context, tag, _html_attributes_to_nodes(context.sub("attributes"), attributes), \
    _element_content_to_ordered_nodes(context.sub("children"), content))

HTMLFragment

HTMLFragment(context: Context, content: ElementContent)
Source code in rxxxt/elements.py
76
77
78
@fn_element
def HTMLFragment(context: Context, content: ElementContent):
  return FragmentNode(context, _element_content_to_ordered_nodes(context, content))

HTMLVoidElement

HTMLVoidElement(context: Context, tag: str, attributes: HTMLAttributes)
Source code in rxxxt/elements.py
80
81
82
@fn_element
def HTMLVoidElement(context: Context, tag: str, attributes: HTMLAttributes):
  return VoidElementNode(context, tag, _html_attributes_to_nodes(context.sub("attributes"), attributes))

KeyedElement

KeyedElement(context: Context, key: str, element: Element)
Source code in rxxxt/elements.py
89
90
91
92
93
@fn_element
def KeyedElement(context: Context, key: str, element: Element):
  try: context = context.replace_index(key)
  except ValueError as e: logging.debug(f"Failed to replace index with key {key}", e)
  return element.tonode(context)

lazy_element

lazy_element(fn: Callable[Concatenate[Context, FNP], Element]) -> Callable[FNP, Element]
Source code in rxxxt/elements.py
71
72
73
74
def lazy_element(fn: Callable[Concatenate[Context, FNP], Element]) -> Callable[FNP, 'Element']:
  def _inner(context: Context, *args: FNP.args, **kwargs: FNP.kwargs) -> Node:
    return fn(context, *args, **kwargs).tonode(context)
  return fn_element(_inner)

merge_attributes

merge_attributes(a: HTMLAttributes, b: HTMLAttributes)
Source code in rxxxt/elements.py
163
164
def merge_attributes(a: HTMLAttributes, b: HTMLAttributes):
  return dict(_merge_attribute_items(itertools.chain(a.items(), b.items())))

query_selector_all_event

query_selector_all_event(name: str, selector: str, handler: Callable)
Source code in rxxxt/elements.py
157
158
def query_selector_all_event(name: str, selector: str, handler: Callable):
  return El["rxxxt-query-selector-event"](name=name, selector=selector, content=[], onemit=handler)

ScriptContent

ScriptContent(context: Context, script: str)
Source code in rxxxt/elements.py
111
112
113
@fn_element
def ScriptContent(context: Context, script: str):
  return TextNode(context, script.replace("</", "<\\/"))

TextElement

TextElement(context: Context, text: str)
Source code in rxxxt/elements.py
103
104
105
@fn_element
def TextElement(context: Context, text: str):
  return TextNode(context, html.escape(text))

UnescapedHTMLElement

UnescapedHTMLElement(context: Context, text: str)
Source code in rxxxt/elements.py
107
108
109
@fn_element
def UnescapedHTMLElement(context: Context, text: str):
  return TextNode(context, text)

VEl

window_event

window_event(name: str, handler: Callable)
Source code in rxxxt/elements.py
154
155
def window_event(name: str, handler: Callable):
  return El["rxxxt-window-event"](name=name, content=[], onemit=handler)

WithRegistered

WithRegistered(context: Context, register: dict[str, Any], child: Element)
Source code in rxxxt/elements.py
 99
100
101
@fn_element
def WithRegistered(context: Context, register: dict[str, Any], child: Element):
  return child.tonode(context.update_registry(register))

Context dataclass

Context(id: ContextStack, state: State, registry: dict[str, Any], config: ContextConfig, execution: Execution)

Classes:

Name Description
StateConsumer

Methods:

Name Description
__hash__
delete_cookie
emit
get_header
match_path
navigate
registered
replace_index
request_update
set_cookie
sub
subscribe
update_registry
use_websocket

Attributes:

Name Type Description
config ContextConfig
cookies dict[str, str]
execution Execution
id ContextStack
location
path
query_string
registry dict[str, Any]
sid
stack_sids
state State
update_consumer

config instance-attribute

config: ContextConfig

cookies property

cookies: dict[str, str]

execution instance-attribute

execution: Execution

id instance-attribute

id: ContextStack

location property

location

path property

path

query_string property

query_string

registry instance-attribute

registry: dict[str, Any]

sid cached property

sid

stack_sids property

stack_sids

state instance-attribute

state: State

update_consumer cached property

update_consumer

StateConsumer

StateConsumer(context: Context)

Bases: StateConsumer

Methods:

Name Description
consume
detach

Attributes:

Name Type Description
context
Source code in rxxxt/execution.py
88
def __init__(self, context: 'Context') -> None: self.context = context
context instance-attribute
context = context
consume
consume(key: str, producer: Callable[[], str]) -> Any
Source code in rxxxt/execution.py
89
def consume(self, key: str, producer: Callable[[], str]) -> Any: self.context.request_update()
detach
detach(key: str) -> Any
Source code in rxxxt/execution.py
90
def detach(self, key: str) -> Any: self.context.request_update()

__hash__

__hash__() -> int
Source code in rxxxt/execution.py
92
93
def __hash__(self) -> int:
  return hash(self.id)
delete_cookie(name: str, mirror_state: bool = True)
Source code in rxxxt/execution.py
176
177
178
179
def delete_cookie(self, name: str, mirror_state: bool = True):
  self.set_cookie(name=name, max_age=-1, mirror_state=False)
  if mirror_state:
    self.state.set_many({ "!header;cookie": "; ".join(f"{k}={v}" for k, v in self.cookies.items() if k != name) })

emit

emit(name: str, data: dict[str, int | float | str | bool | None])
Source code in rxxxt/execution.py
153
154
def emit(self, name: str, data: dict[str, int | float | str | bool | None]):
  self.execution.add_output_event(dict(event="custom", name=name, data=data))

get_header

get_header(name: str) -> tuple[str, ...]
Source code in rxxxt/execution.py
145
146
147
148
def get_header(self, name: str) -> tuple[str, ...]:
  header_lines = self._get_state_str_subscribe(f"!header;{name}")
  if header_lines is None: return ()
  else: return tuple(header_lines.splitlines())

match_path

match_path(pattern: str, re_flags: int = IGNORECASE)
Source code in rxxxt/execution.py
142
143
def match_path(self, pattern: str, re_flags: int = re.IGNORECASE):
  return match_path(pattern, self.path, re_flags)

navigate

navigate(location: str)
Source code in rxxxt/execution.py
156
157
158
159
def navigate(self, location: str):
  is_full_url = ":" in location # colon means full url
  if not is_full_url: self.state.get("!location").set(location)
  self.execution.add_output_event(dict(event="navigate", location=location, requires_refresh=is_full_url or None))

registered

registered(name: str, t: type[T]) -> T
Source code in rxxxt/execution.py
137
138
139
140
def registered(self, name: str, t: type[T]) -> T:
  if not isinstance((val:=self.registry.get(name)), t):
    raise TypeError(f"Invalid type in get_registered '{type(val)}'!")
  return val

replace_index

replace_index(key: str)
Source code in rxxxt/execution.py
133
134
135
def replace_index(self, key: str):
  if isinstance(self.id[-1], int): return dataclasses.replace(self, id=self.id[:-1] + (key,))
  raise ValueError("No index to replace!")

request_update

request_update()
Source code in rxxxt/execution.py
150
def request_update(self): self.execution.request_update(self.id)
set_cookie(name: str, value: str | None = None, expires: datetime | None = None, path: str | None = None, secure: bool | None = None, http_only: bool | None = None, domain: str | None = None, max_age: int | None = None, mirror_state: bool = True)
Source code in rxxxt/execution.py
163
164
165
166
167
168
169
170
171
172
173
174
def set_cookie(self, name: str, value: str | None = None, expires: datetime | None = None, path: str | None = None,
              secure: bool | None = None, http_only: bool | None = None, domain: str | None = None, max_age: int | None = None, mirror_state: bool = True):
  if not re.match(r'^[^=;, \t\n\r\f\v]+$', name): raise ValueError("Invalid cookie name")
  if value is not None and not re.match(r'^[^;, \t\n\r\f\v]+$', value): raise ValueError("Invalid value.")
  if domain is not None and not re.match(r'^[^;, \t\n\r\f\v]+$', domain): raise ValueError("Invalid domain.")
  if path is not None and not re.match(r'^[^\x00-\x20;,\s]+$', path): raise ValueError("Invalid path.")

  expires_str = None if expires is None else expires.isoformat()

  self.execution.add_output_event(dict(event="set-cookie", name=name, value=value, expires=expires_str, path=path, secure=secure, http_only=http_only, domain=domain, max_age=max_age))
  if mirror_state:
    self.state.set_many({ "!header;cookie": "; ".join(f"{k}={v}" for k, v in (self.cookies | { name: value }).items()) })

sub

sub(key: ContextStackKey)
Source code in rxxxt/execution.py
132
def sub(self, key: ContextStackKey): return dataclasses.replace(self, id=self.id + (key,))

subscribe

subscribe(key: str)
Source code in rxxxt/execution.py
151
def subscribe(self, key: str): self.state.get(key).add_consumer(self.update_consumer)

update_registry

update_registry(registry: dict[str, Any])
Source code in rxxxt/execution.py
136
def update_registry(self, registry: dict[str, Any]): return dataclasses.replace(self, registry=self.registry | registry)

use_websocket

use_websocket(websocket: bool = True)
Source code in rxxxt/execution.py
161
def use_websocket(self, websocket: bool = True): self.execution.add_output_event(dict(event="use-websocket", websocket=websocket))

InputEventDescriptorOptions

Bases: BaseModel

Attributes:

Name Type Description
debounce int | None
default_params dict[str, int | float | str | bool | None] | None
no_trigger bool
param_map dict[str, str]
prevent_default bool
throttle int | None

debounce class-attribute instance-attribute

debounce: int | None = None

default_params class-attribute instance-attribute

default_params: dict[str, int | float | str | bool | None] | None = None

no_trigger class-attribute instance-attribute

no_trigger: bool = False

param_map class-attribute instance-attribute

param_map: dict[str, str] = field(default_factory=dict)

prevent_default class-attribute instance-attribute

prevent_default: bool = False

throttle class-attribute instance-attribute

throttle: int | None = None

State

State()

Methods:

Name Description
cleanup
delete
destroy
get
get_key_values
set_many

Attributes:

Name Type Description
keys
Source code in rxxxt/state.py
68
69
def __init__(self) -> None:
  self._key_states: dict[str, KeyState] = {}

keys property

keys

cleanup

cleanup(inactive_prefixes: Set[str])
Source code in rxxxt/state.py
92
93
94
95
96
def cleanup(self, inactive_prefixes: Set[str]):
  active_keys = self._get_active_keys(inactive_prefixes)
  inactive_keys = tuple(key for key in self._key_states.keys() if key not in active_keys)
  for key in inactive_keys:
    return self.delete(key)

delete

delete(key: str)
Source code in rxxxt/state.py
83
84
85
86
def delete(self, key: str):
  state = self._key_states.pop(key, None)
  if state is not None:
    state.destroy()

destroy

destroy()
Source code in rxxxt/state.py
 98
 99
100
101
def destroy(self):
  for state in self._key_states.values():
    state.destroy()
  self._key_states.clear()

get

get(key: str)
Source code in rxxxt/state.py
74
75
76
77
78
def get(self, key: str):
  if (state := self._key_states.get(key)) is None:
    state = KeyState(key, None)
    self._key_states[key] = state
  return state

get_key_values

get_key_values(inactive_prefixes: Set[str])
Source code in rxxxt/state.py
88
89
90
def get_key_values(self, inactive_prefixes: Set[str]):
  active_keys = self._get_active_keys(inactive_prefixes)
  return { key: state.get() for key, state in self._key_states.items() if key in active_keys and state.has_value }

set_many

set_many(kvs: dict[str, str])
Source code in rxxxt/state.py
80
81
def set_many(self, kvs: dict[str, str]):
  for k, v in kvs.items(): self.get(k).set(v)

JWTError

Bases: Exception

JWTManager

JWTManager(secret: bytes, max_age: timedelta, algorithm: str = 'HS512')

Classes:

Name Description
JWTHeader
JWTPayloadValidations

Methods:

Name Description
b64_url_decode
b64_url_encode
encode_json
sign
verify

Attributes:

Name Type Description
JWTPayloadAdapter
Source code in rxxxt/helpers.py
79
80
81
82
83
84
85
def __init__(self, secret: bytes, max_age: timedelta, algorithm: str = "HS512") -> None:
  super().__init__()
  self._secret = secret
  self._max_age: timedelta = max_age
  self._algorithm = algorithm
  self._digest = { "HS256": hashlib.sha256, "HS384": hashlib.sha384, "HS512": hashlib.sha512 }[algorithm]
  self._jwt_header = JWTManager.encode_json({ "typ": "JWT", "alg": self._algorithm }) + b"."

JWTPayloadAdapter class-attribute instance-attribute

JWTPayloadAdapter = TypeAdapter(dict[str, Any])

JWTHeader

Bases: BaseModel

Attributes:

Name Type Description
alg str
typ Literal['JWT']
alg instance-attribute
alg: str
typ instance-attribute
typ: Literal['JWT']

JWTPayloadValidations

Bases: BaseModel

Methods:

Name Description
is_valid

Attributes:

Name Type Description
exp int
exp instance-attribute
exp: int
is_valid
is_valid()
Source code in rxxxt/helpers.py
73
74
75
def is_valid(self):
  expires_dt = datetime.fromtimestamp(self.exp, timezone.utc)
  return expires_dt >= datetime.now(tz=timezone.utc)

b64_url_decode staticmethod

b64_url_decode(value: bytes | bytearray)
Source code in rxxxt/helpers.py
131
132
133
@staticmethod
def b64_url_decode(value: bytes | bytearray):
  return base64.urlsafe_b64decode(value + b"=" * (4 - len(value) % 4))

b64_url_encode staticmethod

b64_url_encode(value: bytes | bytearray)
Source code in rxxxt/helpers.py
127
128
129
@staticmethod
def b64_url_encode(value: bytes | bytearray):
  return base64.urlsafe_b64encode(value).rstrip(b"=")

encode_json staticmethod

encode_json(obj: Any)
Source code in rxxxt/helpers.py
123
124
125
@staticmethod
def encode_json(obj: Any):
  return JWTManager.b64_url_encode(json.dumps(obj).encode())

sign

sign(extra_fields: dict[str, Any])
Source code in rxxxt/helpers.py
87
88
89
90
91
92
93
94
95
96
97
98
99
def sign(self, extra_fields: dict[str, Any]):
  try:
    expires_at = int((datetime.now(tz=timezone.utc) + self._max_age).timestamp())
    stream = io.BytesIO()
    _ = stream.write(self._jwt_header)
    _ = stream.write(JWTManager.encode_json({ "exp": expires_at, **extra_fields }))
    signature = hmac.digest(self._secret, stream.getvalue(), self._digest)
    _ = stream.write(b".")
    _ = stream.write(JWTManager.b64_url_encode(signature))
    return stream.getvalue().decode()
  except Exception as e:
    if not isinstance(e, JWTError): raise JWTError(e)
    else: raise e

verify

verify(token: str)
Source code in rxxxt/helpers.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
def verify(self, token: str):
  try:
    parts = token.encode().split(b".")
    if len(parts) != 3: raise JWTError("invalid format (expected 3 parts)")

    header = JWTManager.JWTHeader.model_validate_json(JWTManager.b64_url_decode(parts[0]))
    if header.alg != self._algorithm: raise JWTError("invalid algorithm in header")

    ref_signature = hmac.digest(self._secret, parts[0] + b"." + parts[1], self._digest)
    if not hmac.compare_digest(JWTManager.b64_url_decode(parts[2]), ref_signature):
      raise JWTError("invalid JWT signature!")

    full_payload = JWTManager.JWTPayloadAdapter.validate_json(JWTManager.b64_url_decode(parts[1]))
    if not JWTManager.JWTPayloadValidations.model_validate(full_payload).is_valid():
      raise JWTError("token expired")

    full_payload.pop("exp", None)
    return full_payload
  except Exception as e:
    if not isinstance(e, JWTError): raise JWTError(e)
    else: raise e

match_path

match_path(pattern: str, path: str, re_flags: int = IGNORECASE)
Source code in rxxxt/helpers.py
57
58
def match_path(pattern: str, path: str, re_flags: int = re.IGNORECASE):
  return _compile_matcher(pattern, re_flags)(path)

default_page

default_page(header: Element, content: Element, body_end: Element)
Source code in rxxxt/page.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
def default_page(header: Element, content: Element, body_end: Element):
  return HTMLFragment([
    VEl["!DOCTYPE"](html=None),
    El.html(content=[
      El.head(content=[
        VEl.meta(charset="UTF-8"),
        VEl.meta(name="viewport", content="width=device-width, initial-scale=1.0"),
        header
      ]),
      El.body(content=[
        content,
        body_end
      ])
    ])
  ])

PageBuilder

PageBuilder(page_factory: PageFactory = default_page)

Bases: PageFactory

Methods:

Name Description
__call__
add_body_end
add_body_script
add_header
add_header_script
add_stylesheet
Source code in rxxxt/page.py
24
25
26
27
def __init__(self, page_factory: PageFactory = default_page) -> None:
  self._header_elements: list[Element] = []
  self._body_end_elements: list[Element] = []
  self._page_factory = page_factory

__call__

__call__(header: Element, content: Element, body_end: Element) -> Element
Source code in rxxxt/page.py
38
39
def __call__(self, header: Element, content: Element, body_end: Element) -> Element:
  return self._page_factory(HTMLFragment([ header, *self._header_elements ]), content, HTMLFragment([ body_end, *self._body_end_elements ]))

add_body_end

add_body_end(el: Element)
Source code in rxxxt/page.py
36
def add_body_end(self, el: Element): self._body_end_elements.append(el)

add_body_script

add_body_script(url: str, content: ElementContent = (), **kwargs: HTMLAttributeValue)
Source code in rxxxt/page.py
32
33
def add_body_script(self, url: str, content: ElementContent = (), **kwargs: HTMLAttributeValue):
  self.add_body_end(El.script(src=url, content=content, **kwargs))

add_header

add_header(el: Element)
Source code in rxxxt/page.py
35
def add_header(self, el: Element): self._header_elements.append(el)

add_header_script

add_header_script(url: str, content: ElementContent = (), **kwargs: HTMLAttributeValue)
Source code in rxxxt/page.py
30
31
def add_header_script(self, url: str, content: ElementContent = (), **kwargs: HTMLAttributeValue):
  self.add_header(El.script(src=url, content=content, **kwargs))

add_stylesheet

add_stylesheet(url: str, **kwargs: HTMLAttributeValue)
Source code in rxxxt/page.py
29
def add_stylesheet(self, url: str, **kwargs: HTMLAttributeValue): self.add_header(VEl.link(rel="stylesheet", href=url, **kwargs))

PageFactory

Bases: Protocol

Methods:

Name Description
__call__

__call__

__call__(header: Element, content: Element, body_end: Element) -> Element
Source code in rxxxt/page.py
5
def __call__(self, header: Element, content: Element, body_end: Element) -> Element: ...

Router

Router()

Bases: ElementFactory

Classes:

Name Description
RoutedComponent

Methods:

Name Description
__call__
add_route
add_router
route
Source code in rxxxt/router.py
34
35
def __init__(self) -> None:
  self._routes: list[tuple[str, ElementFactory]] = []

RoutedComponent

RoutedComponent(routes: tuple[tuple[str, ElementFactory], ...])

Bases: Component

Methods:

Name Description
on_before_update
render

Attributes:

Name Type Description
params
Source code in rxxxt/router.py
12
13
14
15
def __init__(self, routes: tuple[tuple[str, ElementFactory], ...]):
  super().__init__()
  self._routes = routes
  self._selected_match: tuple[int, ElementFactory, dict[str, str]] | None = None
params class-attribute instance-attribute
params = router_params()
on_before_update async
on_before_update() -> None
Source code in rxxxt/router.py
17
18
19
async def on_before_update(self) -> None:
  self._selected_match = self._get_current_match()
  self.params = typing.cast(dict[str, str], dict()) if self._selected_match is None else self._selected_match[2]
render
render() -> Element
Source code in rxxxt/router.py
21
22
23
24
25
def render(self) -> Element:
  if self._selected_match is None:
    return El.h1(content=["Not found!"])
  else:
    return TaggedElement(str(self._selected_match[0]), self._selected_match[1]())

__call__

__call__() -> Element
Source code in rxxxt/router.py
45
def __call__(self) -> Element: return Router.RoutedComponent(tuple(self._routes))

add_route

add_route(pattern: str, element_factory: ElementFactory)
Source code in rxxxt/router.py
38
def add_route(self, pattern: str, element_factory: ElementFactory): self._routes.append((pattern, element_factory))

add_router

add_router(router: Router)
Source code in rxxxt/router.py
37
def add_router(self, router: 'Router'): self._routes.extend(router._routes)

route

route(pattern: str)
Source code in rxxxt/router.py
39
40
41
42
43
def route(self, pattern: str):
  def _inner(fn: ElementFactory):
    self.add_route(pattern, fn)
    return fn
  return _inner

router_params

router_params()
Source code in rxxxt/router.py
6
def router_params(): return context_state(dict[str, str], name="*rp*")

AppConfig dataclass

AppConfig(enable_web_socket_state_updates: bool | None = None, disable_http_update_retry: bool | None = None)

Attributes:

Name Type Description
disable_http_update_retry bool | None
enable_web_socket_state_updates bool | None

disable_http_update_retry class-attribute instance-attribute

disable_http_update_retry: bool | None = None

enable_web_socket_state_updates class-attribute instance-attribute

enable_web_socket_state_updates: bool | None = None

default_state_resolver

default_state_resolver() -> JWTStateResolver

Creates a JWTStateResolver. Uses the environment variable JWT_SECRET as its secret, if set, otherwise creates a new random, temporary secret.

Source code in rxxxt/state.py
132
133
134
135
136
137
138
139
140
141
def default_state_resolver() -> JWTStateResolver:
  """
  Creates a JWTStateResolver.
  Uses the environment variable `JWT_SECRET` as its secret, if set, otherwise creates a new random, temporary secret.
  """

  jwt_secret = os.getenv("JWT_SECRET", None)
  if jwt_secret is None: jwt_secret = secrets.token_bytes(64)
  else: jwt_secret = jwt_secret.encode("utf-8")
  return JWTStateResolver(jwt_secret)

JWTStateResolver

JWTStateResolver(secret: bytes, max_age: timedelta | None = None, algorithm: str = 'HS512')

Bases: StateResolver

Methods:

Name Description
create_token
resolve

Attributes:

Name Type Description
StateDataAdapter
Source code in rxxxt/state.py
118
119
120
def __init__(self, secret: bytes, max_age: timedelta | None = None, algorithm: str = "HS512") -> None:
  super().__init__()
  self._jwt_manager = JWTManager(secret, timedelta(days=1) if max_age is None else max_age, algorithm)

StateDataAdapter class-attribute instance-attribute

StateDataAdapter = TypeAdapter(dict[str, str])

create_token

create_token(data: dict[str, str], old_token: str | None) -> str
Source code in rxxxt/state.py
122
123
124
def create_token(self, data: dict[str, str], old_token: str | None) -> str:
  try: return self._jwt_manager.sign({ "d": data })
  except JWTError as e: raise StateResolverError(e)

resolve

resolve(token: str) -> dict[str, str]
Source code in rxxxt/state.py
126
127
128
129
130
def resolve(self, token: str) -> dict[str, str]:
  try:
    payload = self._jwt_manager.verify(token)
    return JWTStateResolver.StateDataAdapter.validate_python(payload["d"])
  except (ValidationError, JWTError) as e: raise StateResolverError(e)

State

State()

Methods:

Name Description
cleanup
delete
destroy
get
get_key_values
set_many

Attributes:

Name Type Description
keys
Source code in rxxxt/state.py
68
69
def __init__(self) -> None:
  self._key_states: dict[str, KeyState] = {}

keys property

keys

cleanup

cleanup(inactive_prefixes: Set[str])
Source code in rxxxt/state.py
92
93
94
95
96
def cleanup(self, inactive_prefixes: Set[str]):
  active_keys = self._get_active_keys(inactive_prefixes)
  inactive_keys = tuple(key for key in self._key_states.keys() if key not in active_keys)
  for key in inactive_keys:
    return self.delete(key)

delete

delete(key: str)
Source code in rxxxt/state.py
83
84
85
86
def delete(self, key: str):
  state = self._key_states.pop(key, None)
  if state is not None:
    state.destroy()

destroy

destroy()
Source code in rxxxt/state.py
 98
 99
100
101
def destroy(self):
  for state in self._key_states.values():
    state.destroy()
  self._key_states.clear()

get

get(key: str)
Source code in rxxxt/state.py
74
75
76
77
78
def get(self, key: str):
  if (state := self._key_states.get(key)) is None:
    state = KeyState(key, None)
    self._key_states[key] = state
  return state

get_key_values

get_key_values(inactive_prefixes: Set[str])
Source code in rxxxt/state.py
88
89
90
def get_key_values(self, inactive_prefixes: Set[str]):
  active_keys = self._get_active_keys(inactive_prefixes)
  return { key: state.get() for key, state in self._key_states.items() if key in active_keys and state.has_value }

set_many

set_many(kvs: dict[str, str])
Source code in rxxxt/state.py
80
81
def set_many(self, kvs: dict[str, str]):
  for k, v in kvs.items(): self.get(k).set(v)

StateResolver

Bases: ABC

Methods:

Name Description
create_token
resolve

create_token abstractmethod

create_token(data: dict[str, str], old_token: str | None) -> str | Awaitable[str]
Source code in rxxxt/state.py
110
111
@abstractmethod
def create_token(self, data: dict[str, str], old_token: str | None) -> str | Awaitable[str]: pass

resolve abstractmethod

resolve(token: str) -> dict[str, str] | Awaitable[dict[str, str]]
Source code in rxxxt/state.py
112
113
@abstractmethod
def resolve(self, token: str) -> dict[str, str] | Awaitable[dict[str, str]]: pass