Skip to content

API

App

App(content: ElementFactory, state_resolver: StateResolver | None = None, page_factory: PageFactory = default_page, config: AppConfig | None = None)

Methods:

Name Description
__call__
Source code in rxxxt/app.py
26
27
28
29
30
31
32
33
34
35
36
37
def __init__(self, content: ElementFactory, state_resolver: StateResolver | None = None, page_factory: PageFactory = default_page, \
    config: AppConfig | None = None) -> None:
  self._content = content
  self._page_factory: PageFactory = page_factory
  self._state_resolver = state_resolver or default_state_resolver()
  self._composer = Composer()
  self._config = config or AppConfig()
  _ = self._composer.add_handler(http_handler(routed_handler("/rxxxt-client.js")(self._http_static_rxxxt_client_js)))
  _ = self._composer.add_handler(http_handler(self._http_post_session))
  _ = self._composer.add_handler(http_handler(self._http_get_session))
  _ = self._composer.add_handler(websocket_handler(self._ws_session))
  _ = self._composer.add_handler(http_not_found_handler)

__call__ async

__call__(scope: ASGIScope, receive: ASGIFnReceive, send: ASGIFnSend) -> Any
Source code in rxxxt/app.py
39
40
async def __call__(self, scope: ASGIScope, receive: ASGIFnReceive, send: ASGIFnSend) -> Any:
  return await self._composer(scope, receive, send)

ASGIFnReceive module-attribute

ASGIFnReceive = Callable[[], Awaitable[MutableMapping[str, Any]]]

ASGIFnSend module-attribute

ASGIFnSend = Callable[[MutableMapping[str, Any]], Awaitable[Any]]

ASGIHandler module-attribute

ASGIHandler = Callable[[ASGIScope, ASGIFnReceive, ASGIFnSend], Awaitable[Any]]

ASGINextException

Bases: Exception

ASGIScope module-attribute

ASGIScope = MutableMapping[str, Any]

Composer

Composer()

Methods:

Name Description
__call__
add_handler
Source code in rxxxt/asgi.py
209
210
def __init__(self) -> None:
  self._handlers: list[ASGIHandler] = []

__call__ async

__call__(scope: ASGIScope, receive: ASGIFnReceive, send: ASGIFnSend) -> Any
Source code in rxxxt/asgi.py
216
217
218
219
220
221
222
223
224
225
226
227
async def __call__(self, scope: ASGIScope, receive: ASGIFnReceive, send: ASGIFnSend) -> Any:
  try:
    for handler in self._handlers:
      try: return await handler(scope, receive, send)
      except ASGINextException: pass
  except asyncio.CancelledError: raise
  except BaseException as e:
    logging.debug("asgi error", exc_info=True, stack_info=True)
    if scope["type"] == "websocket":
      return await self._ws_error_handler(WebsocketContext(scope, receive, send), e)
    if scope["type"] == "http":
      return await self._http_error_handler(HTTPContext(scope, receive, send), e)

add_handler

add_handler(handler: ASGIHandler)
Source code in rxxxt/asgi.py
212
213
214
def add_handler(self, handler: ASGIHandler):
  self._handlers.append(handler)
  return handler

http_handler

http_handler(fn: Callable[[HTTPContext], Awaitable[Any]])
Source code in rxxxt/asgi.py
183
184
185
186
187
def http_handler(fn: Callable[[HTTPContext], Awaitable[Any]]):
  async def _inner(scope: ASGIScope, receive: ASGIFnReceive, send: ASGIFnSend) -> Any:
    if scope["type"] != "http": raise ASGINextException()
    return await fn(HTTPContext(scope, receive, send))
  return _inner

http_not_found_handler async

http_not_found_handler(context: HTTPContext)
Source code in rxxxt/asgi.py
204
205
206
@http_handler
async def http_not_found_handler(context: HTTPContext):
  await context.respond_text("not found", 404)

HTTPContext

HTTPContext(scope: ASGIScope, receive: ASGIFnReceive, send: ASGIFnSend)

Bases: TransportContext

Methods:

Name Description
add_response_headers
receive_bytes
receive_iter
receive_json
receive_json_raw
receive_text
respond_file
respond_text
response_body
response_start

Attributes:

Name Type Description
method
Source code in rxxxt/asgi.py
101
102
103
def __init__(self, scope: ASGIScope, receive: ASGIFnReceive, send: ASGIFnSend) -> None:
  super().__init__(scope, receive, send)
  self._response_headers: list[tuple[BytesLike, BytesLike]] = []

method property

method

add_response_headers

add_response_headers(headers: ASGIHeaders)
Source code in rxxxt/asgi.py
108
def add_response_headers(self, headers: ASGIHeaders): self._response_headers.extend(headers)

receive_bytes async

receive_bytes() -> bytes
Source code in rxxxt/asgi.py
168
169
170
171
172
async def receive_bytes(self) -> bytes:
  stream = io.BytesIO()
  async for chunk in self.receive_iter():
    _ = stream.write(chunk)
  return stream.getvalue()

receive_iter async

receive_iter() -> AsyncGenerator[bytes, Any]
Source code in rxxxt/asgi.py
174
175
176
177
178
179
180
181
async def receive_iter(self) -> AsyncGenerator[bytes, Any]:
  while True:
    event = await self.receive()
    event_type = event.get("type")
    if event_type == "http.request":
      yield event.get("body", b"")
      if not event.get("more_body", False): return
    elif event_type == "http.disconnect": return

receive_json async

receive_json()
Source code in rxxxt/asgi.py
154
async def receive_json(self): return json.loads(await self.receive_json_raw())

receive_json_raw async

receive_json_raw()
Source code in rxxxt/asgi.py
156
async def receive_json_raw(self): return await self.receive_text({ "application/json" })

receive_text async

receive_text(allowed_mime_types: Iterable[str])
Source code in rxxxt/asgi.py
158
159
160
161
162
163
164
165
166
async def receive_text(self, allowed_mime_types: Iterable[str]):
  allowed_mime_types = allowed_mime_types if isinstance(allowed_mime_types, set) else set(allowed_mime_types)
  mime_type, ct_params = self.content_type
  if mime_type not in allowed_mime_types: raise ValueError(f"Mime type '{mime_type}' is not in allowed types!")
  charset = ct_params.get("charset", "utf-8")
  try: decoder = codecs.getdecoder(charset)
  except LookupError: raise ValueError("Invalid content-type encoding!")
  data = await self.receive_bytes()
  return decoder(data, "ignore")[0]

respond_file async

respond_file(path: str | Path, mime_type: str | None = None, handle_404: bool = False, use_last_modified: bool = False)
Source code in rxxxt/asgi.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
async def respond_file(self, path: str | pathlib.Path, mime_type: str | None = None, handle_404: bool = False, use_last_modified: bool = False):
  mime_type = mime_type or mimetypes.guess_type(path)[0]
  if mime_type is None: raise ValueError("Unknown mime type!")
  ppath = pathlib.Path(path)
  if handle_404 and not ppath.exists():
    return await self.respond_text("not found", 404)

  with open(ppath, "rb") as fd:
    fd_stat = os.stat(fd.fileno())

    if use_last_modified:
      last_modified = formatdate(fd_stat.st_mtime, usegmt=True).encode()
      self.add_response_headers([ (b"Last-Modified", last_modified) ])
      if (last_modified,) == self.headers.get("If-Modified-Since", None):
        await self.response_start(304)
        await self.response_body(b"", False)
        return

    self.add_response_headers(content_headers(fd_stat.st_size, mime_type))
    await self.response_start(200)
    while len(data := fd.read(1_000_000)) != 0:
      await self.response_body(data, fd.tell() != fd_stat.st_size)

respond_text async

respond_text(text: str, status: int = 200, mime_type: str = 'text/plain')
Source code in rxxxt/asgi.py
125
126
127
128
129
async def respond_text(self, text: str, status: int = 200, mime_type: str = "text/plain"):
  data = text.encode("utf-8")
  self.add_response_headers(content_headers(len(data), mime_type + "; charset=utf-8"))
  await self.response_start(status)
  await self.response_body(data, False)

response_body async

response_body(data: BytesLike, more_body: bool)
Source code in rxxxt/asgi.py
118
119
120
121
122
123
async def response_body(self, data: BytesLike, more_body: bool):
  await self.send({
    "type": "http.response.body",
    "body": data,
    "more_body": more_body
  })

response_start async

response_start(status: int, trailers: bool = False)
Source code in rxxxt/asgi.py
110
111
112
113
114
115
116
async def response_start(self, status: int, trailers: bool = False):
  await self.send({
    "type": "http.response.start",
    "status": status,
    "headers": self._response_headers,
    "trailers": trailers
  })

routed_handler

routed_handler(pattern: str)
Source code in rxxxt/asgi.py
196
197
198
199
200
201
202
def routed_handler(pattern: str):
  def _inner(fn: Callable[[CTXT, dict[str, str]], Awaitable[Any]]) -> Callable[[CTXT], Awaitable[Any]]:
    async def _inner_inner(context: CTXT) -> Any:
      if (match:=match_path(pattern, context.path)) is None: context.next()
      return await fn(context, match)
    return _inner_inner
  return _inner

TransportContext

TransportContext(scope: ASGIScope, receive: ASGIFnReceive, send: ASGIFnSend)

Methods:

Name Description
next

Attributes:

Name Type Description
content_type
fullpath
headers
location
path
query_string str | None
receive
scope
send
Source code in rxxxt/asgi.py
19
20
21
22
def __init__(self, scope: ASGIScope, receive: ASGIFnReceive, send: ASGIFnSend) -> None:
  self.scope = scope
  self.receive = receive
  self.send = send

content_type cached property

content_type

fullpath property

fullpath

headers cached property

headers

location property

location

path property

path

query_string property

query_string: str | None

receive instance-attribute

receive = receive

scope instance-attribute

scope = scope

send instance-attribute

send = send

next

next()
Source code in rxxxt/asgi.py
58
def next(self): raise ASGINextException()

websocket_handler

websocket_handler(fn: Callable[[WebsocketContext], Awaitable[Any]])
Source code in rxxxt/asgi.py
189
190
191
192
193
def websocket_handler(fn: Callable[[WebsocketContext], Awaitable[Any]]):
  async def _inner(scope: ASGIScope, receive: ASGIFnReceive, send: ASGIFnSend) -> Any:
    if scope["type"] != "websocket": raise ASGINextException()
    return await fn(WebsocketContext(scope, receive, send))
  return _inner

WebsocketContext

WebsocketContext(scope: ASGIScope, receive: ASGIFnReceive, send: ASGIFnSend)

Bases: TransportContext

Methods:

Name Description
close
receive_message
send_message
setup

Attributes:

Name Type Description
connected
Source code in rxxxt/asgi.py
61
62
63
def __init__(self, scope: ASGIScope, receive: ASGIFnReceive, send: ASGIFnSend) -> None:
  super().__init__(scope, receive, send)
  self._connected = True

connected property

connected

close async

close(code: int = 1000, reason: str = 'Normal Closure')
Source code in rxxxt/asgi.py
90
91
92
async def close(self, code: int = 1000, reason: str = "Normal Closure"):
  await self.send({ "type": "websocket.close", "code": code, "reason": reason })
  self._connected = False

receive_message async

receive_message() -> BytesLike | str
Source code in rxxxt/asgi.py
73
74
75
76
77
78
79
80
81
async def receive_message(self) -> BytesLike | str:
  while self._connected:
    event = await self.receive()
    if event["type"] == "websocket.disconnect":
      self._connected = False
      raise ConnectionError("Connection closed!")
    elif event["type"] == "websocket.receive":
      return event.get("bytes", event.get("text"))
  raise ConnectionError("Connection closed!")

send_message async

send_message(data: str | BytesLike)
Source code in rxxxt/asgi.py
83
84
85
86
87
88
async def send_message(self, data: str | BytesLike):
  if not self._connected: raise ConnectionError("Not connected!")
  event: dict[str, Any] = { "type": "websocket.send", "bytes": None, "text": None }
  if isinstance(data, str): event["text"] = data
  else: event["bytes"] = data
  await self.send(event)

setup async

setup(headers: ASGIHeaders = (), subprotocol: str | None = None)
Source code in rxxxt/asgi.py
68
69
70
71
async def setup(self, headers: ASGIHeaders = (), subprotocol: str | None = None):
  event = await self.receive()
  if event["type"] != "websocket.connect": raise ConnectionError("Did not receive connect event!")
  await self.send({ "type": "websocket.accept", "subprotocol": subprotocol, "headers": [ (name.lower(), value) for name, value in headers ] })

Component

Component()

Bases: Element

Methods:

Name Description
add_job

Runs a background job until completion. Only runs when the session is persistent.

add_worker

Runs a background worker, which may be cancelled at any time. Only runs when the session is persistent.

lc_destroy
lc_handle_event
lc_init
lc_render
on_after_destroy
on_after_update
on_before_destroy
on_before_update
on_init
render
tonode

Attributes:

Name Type Description
context Context
Source code in rxxxt/component.py
213
214
215
216
217
def __init__(self) -> None:
  super().__init__()
  self.context: Context
  self._worker_tasks: list[asyncio.Task[Any]] = []
  self._job_tasks: list[asyncio.Task[Any]] = []

context instance-attribute

context: Context

add_job

add_job(a: Coroutine[Any, Any, Any])

Runs a background job until completion. Only runs when the session is persistent. args: a: Coroutine - the coroutine that should be run

Source code in rxxxt/component.py
222
223
224
225
226
227
228
229
230
def add_job(self, a: Coroutine[Any, Any, Any]):
  """
  Runs a background job until completion. Only runs when the session is persistent.
  args:
    a: Coroutine - the coroutine that should be run
  """
  if self.context.config.persistent:
    self._worker_tasks.append(asyncio.create_task(a))
  else: a.close()

add_worker

add_worker(a: Coroutine[Any, Any, Any])

Runs a background worker, which may be cancelled at any time. Only runs when the session is persistent. args: a: Coroutine - the coroutine that should be run

Source code in rxxxt/component.py
231
232
233
234
235
236
237
238
239
def add_worker(self, a: Coroutine[Any, Any, Any]):
  """
  Runs a background worker, which may be cancelled at any time. Only runs when the session is persistent.
  args:
    a: Coroutine - the coroutine that should be run
  """
  if self.context.config.persistent:
    self._worker_tasks.append(asyncio.create_task(a))
  else: a.close()

lc_destroy async

lc_destroy() -> None
Source code in rxxxt/component.py
252
253
254
255
256
257
258
259
260
261
262
263
async def lc_destroy(self) -> None:
  await self.on_before_destroy()
  if len(self._job_tasks) > 0:
    try: _ = await asyncio.wait(self._job_tasks)
    except asyncio.CancelledError: pass
    self._job_tasks.clear()
  if len(self._worker_tasks) > 0:
    for t in self._worker_tasks: _ = t.cancel()
    try: _ = await asyncio.wait(self._worker_tasks)
    except asyncio.CancelledError: pass
    self._worker_tasks.clear()
  await self.on_after_destroy()

lc_handle_event async

lc_handle_event(event: dict[str, int | float | str | bool | None])
Source code in rxxxt/component.py
265
266
267
268
269
270
async def lc_handle_event(self, event: dict[str, int | float | str | bool | None]):
  handler_name = event.pop("$handler_name", None)
  if isinstance(handler_name, str):
    fn = getattr(self, handler_name, None) # NOTE: this is risky!!
    if isinstance(fn, EventHandler):
      await to_awaitable(cast(EventHandler[..., Any], fn), **event)

lc_init async

lc_init(context: Context) -> None
Source code in rxxxt/component.py
241
242
243
async def lc_init(self, context: Context) -> None:
  self.context = context
  await self.on_init()

lc_render async

lc_render() -> Element
Source code in rxxxt/component.py
245
246
247
248
249
250
251
async def lc_render(self) -> Element:
  await self.on_before_update()
  el = await to_awaitable(self.render)
  try: self.context.execution.pending_updates.remove(self.context.id) # NOTE: remove any update that was requested during render
  except KeyError: pass
  await self.on_after_update()
  return el

on_after_destroy async

on_after_destroy() -> None
Source code in rxxxt/component.py
276
async def on_after_destroy(self) -> None: ...

on_after_update async

on_after_update() -> None
Source code in rxxxt/component.py
274
async def on_after_update(self) -> None: ...

on_before_destroy async

on_before_destroy() -> None
Source code in rxxxt/component.py
275
async def on_before_destroy(self) -> None: ...

on_before_update async

on_before_update() -> None
Source code in rxxxt/component.py
273
async def on_before_update(self) -> None: ...

on_init async

on_init() -> None
Source code in rxxxt/component.py
272
async def on_init(self) -> None: ...

render abstractmethod

render() -> Element | Awaitable[Element]
Source code in rxxxt/component.py
219
220
@abstractmethod
def render(self) -> Element | Awaitable[Element]: ...

tonode

tonode(context: Context) -> Node
Source code in rxxxt/component.py
278
def tonode(self, context: Context) -> 'Node': return ComponentNode(context, self)

context_state

context_state(default_factory: Callable[[], T], name: str | None = None)
Source code in rxxxt/component.py
133
134
def context_state(default_factory: Callable[[], T], name: str | None = None):
  return StateDescriptor(get_context_state_key, default_factory, state_name=name)

context_state_box

context_state_box(default_factory: Callable[[], T], name: str | None = None)
Source code in rxxxt/component.py
142
143
def context_state_box(default_factory: Callable[[], T], name: str | None = None):
  return StateBoxDescriptor(get_context_state_key, default_factory, state_name=name)

event_handler

event_handler(**kwargs: Any)
Source code in rxxxt/component.py
199
200
201
202
def event_handler(**kwargs: Any):
  options = InputEventDescriptorOptions.model_validate(kwargs)
  def _inner(fn: Callable[Concatenate[Any, FNP], FNR]) -> ClassEventHandler[FNP, FNR]: return ClassEventHandler(fn, options)
  return _inner

EventHandler

EventHandler(fn: Callable[Concatenate[Any, FNP], FNR], options: InputEventDescriptorOptions, instance: Any)

Bases: ClassEventHandler[FNP, FNR], Generic[FNP, FNR], CustomAttribute, InputEventDescriptorGenerator

Methods:

Name Description
__call__
bind
get_key_values

Attributes:

Name Type Description
descriptor
Source code in rxxxt/component.py
153
154
155
156
def __init__(self, fn: Callable[Concatenate[Any, FNP], FNR], options: InputEventDescriptorOptions, instance: Any) -> None:
  super().__init__(validate_call(fn), options)
  if not isinstance(instance, Component): raise ValueError("The provided instance must be a component!")
  self._instance: 'Component' = instance

descriptor property

descriptor

__call__

__call__(*args: args, **kwargs: kwargs) -> FNR
Source code in rxxxt/component.py
166
def __call__(self, *args: FNP.args, **kwargs: FNP.kwargs) -> FNR: return self._fn(self._instance, *args, **kwargs)

bind

bind(**kwargs: int | float | str | bool | None)
Source code in rxxxt/component.py
168
169
170
171
172
173
174
def bind(self, **kwargs: int | float | str | bool | None):
  if len(kwargs) == 0: return
  new_options = InputEventDescriptorOptions.model_validate({
    **self._options.model_dump(),
    "default_params": (self._options.default_params or {}) | kwargs
  })
  return EventHandler(self._fn, new_options, self._instance)

get_key_values

get_key_values(original_key: str)
Source code in rxxxt/component.py
176
177
178
179
def get_key_values(self, original_key: str):
  if not original_key.startswith("on"): raise ValueError("Event handler must be applied to an attribute starting with 'on'.")
  v = base64.b64encode(self.descriptor.model_dump_json(exclude_defaults=True).encode("utf-8")).decode("utf-8")
  return ((f"rxxxt-on-{original_key[2:]}", v),)

global_state

global_state(default_factory: Callable[[], T], name: str | None = None)
Source code in rxxxt/component.py
130
131
def global_state(default_factory: Callable[[], T], name: str | None = None):
  return StateDescriptor(get_global_state_key, default_factory, state_name=name)

global_state_box

global_state_box(default_factory: Callable[[], T], name: str | None = None)
Source code in rxxxt/component.py
139
140
def global_state_box(default_factory: Callable[[], T], name: str | None = None):
  return StateBoxDescriptor(get_global_state_key, default_factory, state_name=name)

HandleNavigate

HandleNavigate(location: str)

Bases: CustomAttribute

Methods:

Name Description
get_key_values

Attributes:

Name Type Description
location
Source code in rxxxt/component.py
205
206
207
def __init__(self, location: str) -> None:
  super().__init__()
  self.location = location

location instance-attribute

location = location

get_key_values

get_key_values(original_key: str) -> tuple[tuple[str, str], ...]
Source code in rxxxt/component.py
209
210
def get_key_values(self, original_key: str) -> tuple[tuple[str, str],...]:
  return ((original_key, f"window.rxxxt.navigate('{self.location}');"),)

local_state

local_state(default_factory: Callable[[], T], name: str | None = None)
Source code in rxxxt/component.py
127
128
def local_state(default_factory: Callable[[], T], name: str | None = None):
  return StateDescriptor(get_local_state_key, default_factory, state_name=name)

local_state_box

local_state_box(default_factory: Callable[[], T], name: str | None = None)
Source code in rxxxt/component.py
136
137
def local_state_box(default_factory: Callable[[], T], name: str | None = None):
  return StateBoxDescriptor(get_local_state_key, default_factory, state_name=name)

StateBox

StateBox(key: str, state: State, default_factory: Callable[[], T], adapter: TypeAdapter[T])

Bases: Generic[T], StateCell

Methods:

Name Description
__enter__
__exit__
consume
detach
produce
update

Attributes:

Name Type Description
key
value
Source code in rxxxt/component.py
13
14
15
16
17
18
19
20
21
22
23
24
25
def __init__(self, key: str, state: State, default_factory: Callable[[], T], adapter: TypeAdapter[T]) -> None:
  super().__init__()
  self._key = key
  self._state = state
  self._adapter = adapter
  self._value: T

  key_state = state.get(key)
  key_state.add_consumer(self)
  try: self._value = adapter.validate_json(key_state.get())
  except ValueError:
    self._value = default_factory()
    key_state.set(self)

key property

key

value property writable

value

__enter__

__enter__()
Source code in rxxxt/component.py
27
def __enter__(self): return self.value

__exit__

__exit__(*_)
Source code in rxxxt/component.py
28
def __exit__(self, *_): self.update()

consume

consume(key: str, producer: Callable[[], str]) -> Any
Source code in rxxxt/component.py
47
48
def consume(self, key: str, producer: Callable[[], str]) -> Any:
  self._value = self._adapter.validate_json(producer())

detach

detach(key: str) -> Any
Source code in rxxxt/component.py
50
51
def detach(self, key: str) -> Any:
  del self._value

produce

produce(key: str) -> str
Source code in rxxxt/component.py
44
45
def produce(self, key: str) -> str:
  return self._adapter.dump_json(self._value).decode()

update

update()
Source code in rxxxt/component.py
41
42
def update(self):
  self._state.get(self._key).set(self)

add_attributes

add_attributes(base: HTMLAttributes, **kwargs: HTMLAttributeValue)
Source code in rxxxt/elements.py
151
152
def add_attributes(base: HTMLAttributes, **kwargs: HTMLAttributeValue):
  return merge_attributes(kwargs, base)

class_map

class_map(map: dict[str, bool])
Source code in rxxxt/elements.py
145
146
def class_map(map: dict[str, bool]):
  return " ".join([ k for k, v in map.items() if v ])

CustomAttribute

Bases: ABC

Methods:

Name Description
get_key_values

get_key_values abstractmethod

get_key_values(original_key: str) -> tuple[tuple[str, str | None], ...]
Source code in rxxxt/elements.py
15
16
@abstractmethod
def get_key_values(self, original_key: str) -> tuple[tuple[str, str | None],...]: ...

El

Element

Bases: ABC

Methods:

Name Description
tonode

tonode abstractmethod

tonode(context: Context) -> Node
Source code in rxxxt/elements.py
11
12
@abstractmethod
def tonode(self, context: Context) -> Node: ...

ElementContent module-attribute

ElementContent = Iterable[Element | str]

ElementFactory

Bases: Protocol

Methods:

Name Description
__call__

__call__

__call__() -> Element
Source code in rxxxt/elements.py
140
def __call__(self) -> Element: ...

HTMLAttributes module-attribute

HTMLAttributes = dict[str, str | bool | int | float | CustomAttribute | None]

HTMLAttributeValue module-attribute

HTMLAttributeValue = str | bool | int | float | CustomAttribute | None

HTMLElement

HTMLElement(context: Context, tag: str, attributes: HTMLAttributes, content: ElementContent)
Source code in rxxxt/elements.py
76
77
78
@fn_element
def HTMLElement(context: Context, tag: str, attributes: HTMLAttributes, content: ElementContent):
  return ElementNode(context, tag, _html_attributes_to_kv(attributes), _element_content_to_ordered_nodes(context, content))

HTMLFragment

HTMLFragment(context: Context, content: ElementContent)
Source code in rxxxt/elements.py
68
69
70
@fn_element
def HTMLFragment(context: Context, content: ElementContent):
  return FragmentNode(context, _element_content_to_ordered_nodes(context, content))

HTMLVoidElement

HTMLVoidElement(context: Context, tag: str, attributes: HTMLAttributes)
Source code in rxxxt/elements.py
72
73
74
@fn_element
def HTMLVoidElement(context: Context, tag: str, attributes: HTMLAttributes):
  return VoidElementNode(context, tag, _html_attributes_to_kv(attributes))

KeyedElement

KeyedElement(context: Context, key: str, element: Element)
Source code in rxxxt/elements.py
80
81
82
83
84
@fn_element
def KeyedElement(context: Context, key: str, element: Element):
  try: context = context.replace_index(key)
  except ValueError as e: logging.debug(f"Failed to replace index with key {key}", e)
  return element.tonode(context)

lazy_element

lazy_element(fn: Callable[Concatenate[Context, FNP], Element]) -> Callable[FNP, Element]
Source code in rxxxt/elements.py
63
64
65
66
def lazy_element(fn: Callable[Concatenate[Context, FNP], Element]) -> Callable[FNP, 'Element']:
  def _inner(context: Context, *args: FNP.args, **kwargs: FNP.kwargs) -> Node:
    return fn(context, *args, **kwargs).tonode(context)
  return fn_element(_inner)

merge_attributes

merge_attributes(a: HTMLAttributes, b: HTMLAttributes)
Source code in rxxxt/elements.py
148
149
def merge_attributes(a: HTMLAttributes, b: HTMLAttributes):
  return dict(_merge_attribute_items(itertools.chain(a.items(), b.items())))

ScriptContent

ScriptContent(context: Context, script: str)
Source code in rxxxt/elements.py
102
103
104
@fn_element
def ScriptContent(context: Context, script: str):
  return TextNode(context, script.replace("</", "<\\/"))

TextElement

TextElement(context: Context, text: str)
Source code in rxxxt/elements.py
94
95
96
@fn_element
def TextElement(context: Context, text: str):
  return TextNode(context, html.escape(text))

UnescapedHTMLElement

UnescapedHTMLElement(context: Context, text: str)
Source code in rxxxt/elements.py
 98
 99
100
@fn_element
def UnescapedHTMLElement(context: Context, text: str):
  return TextNode(context, text)

VEl

WithRegistered

WithRegistered(context: Context, register: dict[str, Any], child: Element)
Source code in rxxxt/elements.py
90
91
92
@fn_element
def WithRegistered(context: Context, register: dict[str, Any], child: Element):
  return child.tonode(context.update_registry(register))

Context dataclass

Context(id: ContextStack, state: State, registry: dict[str, Any], config: ContextConfig, execution: Execution)

Classes:

Name Description
StateConsumer

Methods:

Name Description
__hash__
add_query_selector_event
add_window_event
delete_cookie
emit
get_header
match_path
navigate
registered
remove_query_selector_event
remove_window_event
replace_index
request_update
set_cookie
sub
subscribe
update_registry
use_websocket

Attributes:

Name Type Description
config ContextConfig
cookies dict[str, str]
execution Execution
id ContextStack
location
path
query_string
registry dict[str, Any]
sid
stack_sids
state State
update_consumer

config instance-attribute

config: ContextConfig

cookies property

cookies: dict[str, str]

execution instance-attribute

execution: Execution

id instance-attribute

id: ContextStack

location property

location

path property

path

query_string property

query_string

registry instance-attribute

registry: dict[str, Any]

sid cached property

sid

stack_sids property

stack_sids

state instance-attribute

state: State

update_consumer cached property

update_consumer

StateConsumer

StateConsumer(context: Context)

Bases: StateConsumer

Methods:

Name Description
consume
detach

Attributes:

Name Type Description
context
Source code in rxxxt/execution.py
100
def __init__(self, context: 'Context') -> None: self.context = context
context instance-attribute
context = context
consume
consume(key: str, producer: Callable[[], str]) -> Any
Source code in rxxxt/execution.py
101
def consume(self, key: str, producer: Callable[[], str]) -> Any: self.context.request_update()
detach
detach(key: str) -> Any
Source code in rxxxt/execution.py
102
def detach(self, key: str) -> Any: self.context.request_update()

__hash__

__hash__() -> int
Source code in rxxxt/execution.py
104
105
def __hash__(self) -> int:
  return hash(self.id)

add_query_selector_event

add_query_selector_event(selector: str, name: str, descriptor: InputEventDescriptor | InputEventDescriptorGenerator, all: bool = False)
Source code in rxxxt/execution.py
171
172
def add_query_selector_event(self, selector: str, name: str, descriptor: InputEventDescriptor | InputEventDescriptorGenerator, all: bool = False):
  self._modify_query_selector_event(selector, name, descriptor, all, "add")

add_window_event

add_window_event(name: str, descriptor: InputEventDescriptor | InputEventDescriptorGenerator)
Source code in rxxxt/execution.py
168
169
def add_window_event(self, name: str, descriptor: InputEventDescriptor | InputEventDescriptorGenerator):
  self._modify_window_event(name, descriptor, "add")
delete_cookie(name: str, mirror_state: bool = True)
Source code in rxxxt/execution.py
200
201
202
203
def delete_cookie(self, name: str, mirror_state: bool = True):
  self.set_cookie(name=name, max_age=-1, mirror_state=False)
  if mirror_state:
    self.state.set_many({ "!header;cookie": "; ".join(f"{k}={v}" for k, v in self.cookies.items() if k != name) })

emit

emit(name: str, data: dict[str, int | float | str | bool | None])
Source code in rxxxt/execution.py
165
166
def emit(self, name: str, data: dict[str, int | float | str | bool | None]):
  self.execution.add_output_event(dict(event="custom", name=name, data=data))

get_header

get_header(name: str) -> tuple[str, ...]
Source code in rxxxt/execution.py
157
158
159
160
def get_header(self, name: str) -> tuple[str, ...]:
  header_lines = self._get_state_str_subscribe(f"!header;{name}")
  if header_lines is None: return ()
  else: return tuple(header_lines.splitlines())

match_path

match_path(pattern: str, re_flags: int = IGNORECASE)
Source code in rxxxt/execution.py
154
155
def match_path(self, pattern: str, re_flags: int = re.IGNORECASE):
  return match_path(pattern, self.path, re_flags)

navigate

navigate(location: str)
Source code in rxxxt/execution.py
180
181
182
183
def navigate(self, location: str):
  is_full_url = ":" in location # colon means full url
  if not is_full_url: self.state.get("!location").set(location)
  self.execution.add_output_event(dict(event="navigate", location=location, requires_refresh=is_full_url or None))

registered

registered(name: str, t: type[T]) -> T
Source code in rxxxt/execution.py
149
150
151
152
def registered(self, name: str, t: type[T]) -> T:
  if not isinstance((val:=self.registry.get(name)), t):
    raise TypeError(f"Invalid type in get_registered '{type(val)}'!")
  return val

remove_query_selector_event

remove_query_selector_event(selector: str, name: str, descriptor: InputEventDescriptor | InputEventDescriptorGenerator, all: bool = False)
Source code in rxxxt/execution.py
177
178
def remove_query_selector_event(self, selector: str, name: str, descriptor: InputEventDescriptor | InputEventDescriptorGenerator, all: bool = False):
  self._modify_query_selector_event(selector, name, descriptor, all, "remove")

remove_window_event

remove_window_event(name: str, descriptor: InputEventDescriptor | InputEventDescriptorGenerator)
Source code in rxxxt/execution.py
174
175
def remove_window_event(self, name: str, descriptor: InputEventDescriptor | InputEventDescriptorGenerator):
  self._modify_window_event(name, descriptor, "remove")

replace_index

replace_index(key: str)
Source code in rxxxt/execution.py
145
146
147
def replace_index(self, key: str):
  if isinstance(self.id[-1], int): return dataclasses.replace(self, id=self.id[:-1] + (key,))
  raise ValueError("No index to replace!")

request_update

request_update()
Source code in rxxxt/execution.py
162
def request_update(self): self.execution.request_update(self.id)
set_cookie(name: str, value: str | None = None, expires: datetime | None = None, path: str | None = None, secure: bool | None = None, http_only: bool | None = None, domain: str | None = None, max_age: int | None = None, mirror_state: bool = True)
Source code in rxxxt/execution.py
187
188
189
190
191
192
193
194
195
196
197
198
def set_cookie(self, name: str, value: str | None = None, expires: datetime | None = None, path: str | None = None,
              secure: bool | None = None, http_only: bool | None = None, domain: str | None = None, max_age: int | None = None, mirror_state: bool = True):
  if not re.match(r'^[^=;, \t\n\r\f\v]+$', name): raise ValueError("Invalid cookie name")
  if value is not None and not re.match(r'^[^;, \t\n\r\f\v]+$', value): raise ValueError("Invalid value.")
  if domain is not None and not re.match(r'^[^;, \t\n\r\f\v]+$', domain): raise ValueError("Invalid domain.")
  if path is not None and not re.match(r'^[^\x00-\x20;,\s]+$', path): raise ValueError("Invalid path.")

  expires_str = None if expires is None else expires.isoformat()

  self.execution.add_output_event(dict(event="set-cookie", name=name, value=value, expires=expires_str, path=path, secure=secure, http_only=http_only, domain=domain, max_age=max_age))
  if mirror_state:
    self.state.set_many({ "!header;cookie": "; ".join(f"{k}={v}" for k, v in (self.cookies | { name: value }).items()) })

sub

sub(key: ContextStackKey)
Source code in rxxxt/execution.py
144
def sub(self, key: ContextStackKey): return dataclasses.replace(self, id=self.id + (key,))

subscribe

subscribe(key: str)
Source code in rxxxt/execution.py
163
def subscribe(self, key: str): self.state.get(key).add_consumer(self.update_consumer)

update_registry

update_registry(registry: dict[str, Any])
Source code in rxxxt/execution.py
148
def update_registry(self, registry: dict[str, Any]): return dataclasses.replace(self, registry=self.registry | registry)

use_websocket

use_websocket(websocket: bool = True)
Source code in rxxxt/execution.py
185
def use_websocket(self, websocket: bool = True): self.execution.add_output_event(dict(event="use-websocket", websocket=websocket))

InputEventDescriptorOptions

Bases: BaseModel

Attributes:

Name Type Description
debounce int | None
default_params dict[str, int | float | str | bool | None] | None
no_trigger bool
prevent_default bool
throttle int | None

debounce class-attribute instance-attribute

debounce: int | None = None

default_params class-attribute instance-attribute

default_params: dict[str, int | float | str | bool | None] | None = None

no_trigger class-attribute instance-attribute

no_trigger: bool = False

prevent_default class-attribute instance-attribute

prevent_default: bool = False

throttle class-attribute instance-attribute

throttle: int | None = None

State

State()

Methods:

Name Description
cleanup
delete
destroy
get
get_key_values
set_many

Attributes:

Name Type Description
keys
Source code in rxxxt/state.py
68
69
def __init__(self) -> None:
  self._key_states: dict[str, KeyState] = {}

keys property

keys

cleanup

cleanup(inactive_prefixes: Set[str])
Source code in rxxxt/state.py
92
93
94
95
96
def cleanup(self, inactive_prefixes: Set[str]):
  active_keys = self._get_active_keys(inactive_prefixes)
  inactive_keys = tuple(key for key in self._key_states.keys() if key not in active_keys)
  for key in inactive_keys:
    return self.delete(key)

delete

delete(key: str)
Source code in rxxxt/state.py
83
84
85
86
def delete(self, key: str):
  state = self._key_states.pop(key, None)
  if state is not None:
    state.destroy()

destroy

destroy()
Source code in rxxxt/state.py
 98
 99
100
101
def destroy(self):
  for state in self._key_states.values():
    state.destroy()
  self._key_states.clear()

get

get(key: str)
Source code in rxxxt/state.py
74
75
76
77
78
def get(self, key: str):
  if (state := self._key_states.get(key)) is None:
    state = KeyState(key, None)
    self._key_states[key] = state
  return state

get_key_values

get_key_values(inactive_prefixes: Set[str])
Source code in rxxxt/state.py
88
89
90
def get_key_values(self, inactive_prefixes: Set[str]):
  active_keys = self._get_active_keys(inactive_prefixes)
  return { key: state.get() for key, state in self._key_states.items() if key in active_keys and state.has_value }

set_many

set_many(kvs: dict[str, str])
Source code in rxxxt/state.py
80
81
def set_many(self, kvs: dict[str, str]):
  for k, v in kvs.items(): self.get(k).set(v)

JWTError

Bases: Exception

JWTManager

JWTManager(secret: bytes, max_age: timedelta, algorithm: str = 'HS512')

Classes:

Name Description
JWTHeader
JWTPayloadValidations

Methods:

Name Description
sign
verify

Attributes:

Name Type Description
JWTPayloadAdapter
Source code in rxxxt/helpers.py
83
84
85
86
87
88
89
def __init__(self, secret: bytes, max_age: timedelta, algorithm: str = "HS512") -> None:
  super().__init__()
  self._secret = secret
  self._max_age: timedelta = max_age
  self._algorithm = algorithm
  self._digest = { "HS256": hashlib.sha256, "HS384": hashlib.sha384, "HS512": hashlib.sha512 }[algorithm]
  self._jwt_header = _jwt_encode_json({ "typ": "JWT", "alg": self._algorithm }) + b"."

JWTPayloadAdapter class-attribute instance-attribute

JWTPayloadAdapter = TypeAdapter(dict[str, Any])

JWTHeader

Bases: BaseModel

Attributes:

Name Type Description
alg str
typ Literal['JWT']
alg instance-attribute
alg: str
typ instance-attribute
typ: Literal['JWT']

JWTPayloadValidations

Bases: BaseModel

Methods:

Name Description
is_valid

Attributes:

Name Type Description
exp int
exp instance-attribute
exp: int
is_valid
is_valid()
Source code in rxxxt/helpers.py
79
80
81
def is_valid(self):
  expires_dt = datetime.fromtimestamp(self.exp, timezone.utc)
  return expires_dt >= datetime.now(tz=timezone.utc)

sign

sign(extra_fields: dict[str, Any])
Source code in rxxxt/helpers.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def sign(self, extra_fields: dict[str, Any]):
  try:
    expires_at = int((datetime.now(tz=timezone.utc) + self._max_age).timestamp())
    stream = io.BytesIO()
    _ = stream.write(self._jwt_header)
    _ = stream.write(_jwt_encode_json({ "exp": expires_at, **extra_fields }))
    signature = hmac.digest(self._secret, stream.getvalue(), self._digest)
    _ = stream.write(b".")
    _ = stream.write(_jwt_b64url_encode(signature))
    return stream.getvalue().decode()
  except Exception as e:
    if not isinstance(e, JWTError): raise JWTError(e)
    else: raise e

verify

verify(token: str)
Source code in rxxxt/helpers.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def verify(self, token: str):
  try:
    parts = token.encode().split(b".")
    if len(parts) != 3: raise JWTError("invalid format (expected 3 parts)")

    header = JWTManager.JWTHeader.model_validate_json(jwt_b64url_decode(parts[0]))
    if header.alg != self._algorithm: raise JWTError("invalid algorithm in header")

    ref_signature = hmac.digest(self._secret, parts[0] + b"." + parts[1], self._digest)
    if not hmac.compare_digest(jwt_b64url_decode(parts[2]), ref_signature):
      raise JWTError("invalid JWT signature!")

    full_payload = JWTManager.JWTPayloadAdapter.validate_json(jwt_b64url_decode(parts[1]))
    if not JWTManager.JWTPayloadValidations.model_validate(full_payload).is_valid():
      raise JWTError("token expired")

    full_payload.pop("exp", None)
    return full_payload
  except Exception as e:
    if not isinstance(e, JWTError): raise JWTError(e)
    else: raise e

match_path

match_path(pattern: str, path: str, re_flags: int = IGNORECASE)
Source code in rxxxt/helpers.py
57
58
def match_path(pattern: str, path: str, re_flags: int = re.IGNORECASE):
  return _compile_matcher(pattern, re_flags)(path)

default_page

default_page(header: Element, content: Element, body_end: Element)
Source code in rxxxt/page.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
def default_page(header: Element, content: Element, body_end: Element):
  return HTMLFragment([
    VEl["!DOCTYPE"](html=None),
    El.html(content=[
      El.head(content=[
        VEl.meta(charset="UTF-8"),
        VEl.meta(name="viewport", content="width=device-width, initial-scale=1.0"),
        header
      ]),
      El.body(content=[
        content,
        body_end
      ])
    ])
  ])

PageBuilder

PageBuilder(page_factory: PageFactory = default_page)

Bases: PageFactory

Methods:

Name Description
__call__
add_body_end
add_body_script
add_header
add_header_script
add_stylesheet
Source code in rxxxt/page.py
24
25
26
27
def __init__(self, page_factory: PageFactory = default_page) -> None:
  self._header_elements: list[Element] = []
  self._body_end_elements: list[Element] = []
  self._page_factory = page_factory

__call__

__call__(header: Element, content: Element, body_end: Element) -> Element
Source code in rxxxt/page.py
38
39
def __call__(self, header: Element, content: Element, body_end: Element) -> Element:
  return self._page_factory(HTMLFragment([ header, *self._header_elements ]), content, HTMLFragment([ body_end, *self._body_end_elements ]))

add_body_end

add_body_end(el: Element)
Source code in rxxxt/page.py
36
def add_body_end(self, el: Element): self._body_end_elements.append(el)

add_body_script

add_body_script(url: str, content: ElementContent = (), **kwargs: HTMLAttributeValue)
Source code in rxxxt/page.py
32
33
def add_body_script(self, url: str, content: ElementContent = (), **kwargs: HTMLAttributeValue):
  self.add_body_end(El.script(src=url, content=content, **kwargs))

add_header

add_header(el: Element)
Source code in rxxxt/page.py
35
def add_header(self, el: Element): self._header_elements.append(el)

add_header_script

add_header_script(url: str, content: ElementContent = (), **kwargs: HTMLAttributeValue)
Source code in rxxxt/page.py
30
31
def add_header_script(self, url: str, content: ElementContent = (), **kwargs: HTMLAttributeValue):
  self.add_header(El.script(src=url, content=content, **kwargs))

add_stylesheet

add_stylesheet(url: str, **kwargs: HTMLAttributeValue)
Source code in rxxxt/page.py
29
def add_stylesheet(self, url: str, **kwargs: HTMLAttributeValue): self.add_header(VEl.link(rel="stylesheet", href=url, **kwargs))

PageFactory

Bases: Protocol

Methods:

Name Description
__call__

__call__

__call__(header: Element, content: Element, body_end: Element) -> Element
Source code in rxxxt/page.py
5
def __call__(self, header: Element, content: Element, body_end: Element) -> Element: ...

Router

Router()

Bases: ElementFactory

Classes:

Name Description
RoutedComponent

Methods:

Name Description
__call__
add_route
add_router
route
Source code in rxxxt/router.py
34
35
def __init__(self) -> None:
  self._routes: list[tuple[str, ElementFactory]] = []

RoutedComponent

RoutedComponent(routes: tuple[tuple[str, ElementFactory], ...])

Bases: Component

Methods:

Name Description
on_before_update
render

Attributes:

Name Type Description
params
Source code in rxxxt/router.py
12
13
14
15
def __init__(self, routes: tuple[tuple[str, ElementFactory], ...]):
  super().__init__()
  self._routes = routes
  self._selected_match: tuple[int, ElementFactory, dict[str, str]] | None = None
params class-attribute instance-attribute
params = router_params()
on_before_update async
on_before_update() -> None
Source code in rxxxt/router.py
17
18
19
async def on_before_update(self) -> None:
  self._selected_match = self._get_current_match()
  self.params = typing.cast(dict[str, str], dict()) if self._selected_match is None else self._selected_match[2]
render
render() -> Element
Source code in rxxxt/router.py
21
22
23
24
25
def render(self) -> Element:
  if self._selected_match is None:
    return El.h1(content=["Not found!"])
  else:
    return TaggedElement(str(self._selected_match[0]), self._selected_match[1]())

__call__

__call__() -> Element
Source code in rxxxt/router.py
45
def __call__(self) -> Element: return Router.RoutedComponent(tuple(self._routes))

add_route

add_route(pattern: str, element_factory: ElementFactory)
Source code in rxxxt/router.py
38
def add_route(self, pattern: str, element_factory: ElementFactory): self._routes.append((pattern, element_factory))

add_router

add_router(router: Router)
Source code in rxxxt/router.py
37
def add_router(self, router: 'Router'): self._routes.extend(router._routes)

route

route(pattern: str)
Source code in rxxxt/router.py
39
40
41
42
43
def route(self, pattern: str):
  def _inner(fn: ElementFactory):
    self.add_route(pattern, fn)
    return fn
  return _inner

router_params

router_params()
Source code in rxxxt/router.py
6
def router_params(): return context_state(dict[str, str], "*rp*")

AppConfig dataclass

AppConfig(enable_web_socket_state_updates: bool | None = None, disable_http_update_retry: bool | None = None)

Attributes:

Name Type Description
disable_http_update_retry bool | None
enable_web_socket_state_updates bool | None

disable_http_update_retry class-attribute instance-attribute

disable_http_update_retry: bool | None = None

enable_web_socket_state_updates class-attribute instance-attribute

enable_web_socket_state_updates: bool | None = None

default_state_resolver

default_state_resolver() -> JWTStateResolver

Creates a JWTStateResolver. Uses the environment variable JWT_SECRET as its secret, if set, otherwise creates a new random, temporary secret.

Source code in rxxxt/state.py
132
133
134
135
136
137
138
139
140
141
def default_state_resolver() -> JWTStateResolver:
  """
  Creates a JWTStateResolver.
  Uses the environment variable `JWT_SECRET` as its secret, if set, otherwise creates a new random, temporary secret.
  """

  jwt_secret = os.getenv("JWT_SECRET", None)
  if jwt_secret is None: jwt_secret = secrets.token_bytes(64)
  else: jwt_secret = jwt_secret.encode("utf-8")
  return JWTStateResolver(jwt_secret)

JWTStateResolver

JWTStateResolver(secret: bytes, max_age: timedelta | None = None, algorithm: str = 'HS512')

Bases: StateResolver

Methods:

Name Description
create_token
resolve

Attributes:

Name Type Description
StateDataAdapter
Source code in rxxxt/state.py
118
119
120
def __init__(self, secret: bytes, max_age: timedelta | None = None, algorithm: str = "HS512") -> None:
  super().__init__()
  self._jwt_manager = JWTManager(secret, timedelta(days=1) if max_age is None else max_age, algorithm)

StateDataAdapter class-attribute instance-attribute

StateDataAdapter = TypeAdapter(dict[str, str])

create_token

create_token(data: dict[str, str], old_token: str | None) -> str
Source code in rxxxt/state.py
122
123
124
def create_token(self, data: dict[str, str], old_token: str | None) -> str:
  try: return self._jwt_manager.sign({ "d": data })
  except JWTError as e: raise StateResolverError(e)

resolve

resolve(token: str) -> dict[str, str]
Source code in rxxxt/state.py
126
127
128
129
130
def resolve(self, token: str) -> dict[str, str]:
  try:
    payload = self._jwt_manager.verify(token)
    return JWTStateResolver.StateDataAdapter.validate_python(payload["d"])
  except (ValidationError, JWTError) as e: raise StateResolverError(e)

State

State()

Methods:

Name Description
cleanup
delete
destroy
get
get_key_values
set_many

Attributes:

Name Type Description
keys
Source code in rxxxt/state.py
68
69
def __init__(self) -> None:
  self._key_states: dict[str, KeyState] = {}

keys property

keys

cleanup

cleanup(inactive_prefixes: Set[str])
Source code in rxxxt/state.py
92
93
94
95
96
def cleanup(self, inactive_prefixes: Set[str]):
  active_keys = self._get_active_keys(inactive_prefixes)
  inactive_keys = tuple(key for key in self._key_states.keys() if key not in active_keys)
  for key in inactive_keys:
    return self.delete(key)

delete

delete(key: str)
Source code in rxxxt/state.py
83
84
85
86
def delete(self, key: str):
  state = self._key_states.pop(key, None)
  if state is not None:
    state.destroy()

destroy

destroy()
Source code in rxxxt/state.py
 98
 99
100
101
def destroy(self):
  for state in self._key_states.values():
    state.destroy()
  self._key_states.clear()

get

get(key: str)
Source code in rxxxt/state.py
74
75
76
77
78
def get(self, key: str):
  if (state := self._key_states.get(key)) is None:
    state = KeyState(key, None)
    self._key_states[key] = state
  return state

get_key_values

get_key_values(inactive_prefixes: Set[str])
Source code in rxxxt/state.py
88
89
90
def get_key_values(self, inactive_prefixes: Set[str]):
  active_keys = self._get_active_keys(inactive_prefixes)
  return { key: state.get() for key, state in self._key_states.items() if key in active_keys and state.has_value }

set_many

set_many(kvs: dict[str, str])
Source code in rxxxt/state.py
80
81
def set_many(self, kvs: dict[str, str]):
  for k, v in kvs.items(): self.get(k).set(v)

StateResolver

Bases: ABC

Methods:

Name Description
create_token
resolve

create_token abstractmethod

create_token(data: dict[str, str], old_token: str | None) -> str | Awaitable[str]
Source code in rxxxt/state.py
110
111
@abstractmethod
def create_token(self, data: dict[str, str], old_token: str | None) -> str | Awaitable[str]: pass

resolve abstractmethod

resolve(token: str) -> dict[str, str] | Awaitable[dict[str, str]]
Source code in rxxxt/state.py
112
113
@abstractmethod
def resolve(self, token: str) -> dict[str, str] | Awaitable[dict[str, str]]: pass