Skip to content

Commit

Permalink
WIP: test-server in Python
Browse files Browse the repository at this point in the history
Use aiohttp to create an async server which connects directly with the
bridge, in-process.  This begins to illustrate what a future cockpit-ws
might look like...

Build on Martin's research into Webdriver BiDi and use it as an async
library to control a web browser to connect back to our async webserver.

This is really flaky but showing promise...
  • Loading branch information
allisonkarlitskaya committed Jul 22, 2024
1 parent 8ed363c commit 692597d
Show file tree
Hide file tree
Showing 8 changed files with 1,027 additions and 70 deletions.
2 changes: 1 addition & 1 deletion pkg/base1/test-dbus-common.js
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ export function common_dbus_tests(channel_options, bus_name) { // eslint-disable
assert.ok(false, "should not be reached");
} catch (ex) {
assert.equal(ex.name, "org.freedesktop.DBus.Error.UnknownMethod", "error name");
assert.equal(ex.message, "Method UnimplementedMethod is not implemented on interface com.redhat.Cockpit.DBusTests.Frobber", "error message");
assert.equal(ex.message, "Unknown method UnimplementedMethod or interface com.redhat.Cockpit.DBusTests.Frobber.", "error message");
}
});

Expand Down
8 changes: 8 additions & 0 deletions pkg/base1/test-http.js
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ QUnit.test("headers", assert => {
.get("/mock/headers", null, { Header1: "booo", Header2: "yay value" })
.response((status, headers) => {
assert.equal(status, 201, "status code");

delete headers['Content-Type'];
delete headers.Date;
delete headers.Server;
assert.deepEqual(headers, {
Header1: "booo",
Header2: "yay value",
Expand Down Expand Up @@ -248,6 +252,10 @@ QUnit.test("connection headers", assert => {
.get("/mock/headers", null, { Header2: "yay value", Header0: "extra" })
.response((status, headers) => {
assert.equal(status, 201, "status code");

delete headers['Content-Type'];
delete headers.Date;
delete headers.Server;
assert.deepEqual(headers, {
Header0: "extra",
Header1: "booo",
Expand Down
6 changes: 6 additions & 0 deletions src/cockpit/jsonutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ def get_str(obj: JsonObject, key: str, default: Union[DT, _Empty] = _empty) -> U
return _get(obj, lambda v: typechecked(v, str), key, default)


def get_str_map(obj: JsonObject, key: str, default: DT | _Empty = _empty) -> DT | Mapping[str, str]:
def as_str_map(value: JsonValue) -> Mapping[str, str]:
return {key: typechecked(value, str) for key, value in typechecked(value, dict).items()}
return _get(obj, as_str_map, key, default)


def get_str_or_none(obj: JsonObject, key: str, default: Optional[str]) -> Optional[str]:
return _get(obj, lambda v: None if v is None else typechecked(v, str), key, default)

Expand Down
48 changes: 3 additions & 45 deletions test/common/tap-cdp
Original file line number Diff line number Diff line change
Expand Up @@ -18,41 +18,16 @@
#

import argparse
import os
import re
import subprocess
import sys

from cdp import CDP

tap_line_re = re.compile(r'^(ok [0-9]+|not ok [0-9]+|bail out!|[0-9]+\.\.[0-9]+|# )', re.IGNORECASE)

parser = argparse.ArgumentParser(description="A CDP driver for QUnit which outputs TAP")
parser.add_argument("server", help="path to the test-server and the test page to run", nargs=argparse.REMAINDER)

# Strip prefix from url
# We need this to compensate for automake test generation behavior:
# The tests are called with the path (relative to the build directory) of the testfile,
# but from the build directory. Some tests make assumptions regarding the structure of the
# filename. In order to make sure that they receive the same name, regardless of actual
# build directory location, we need to strip that prefix (path from build to source directory)
# from the filename
parser.add_argument("--strip", dest="strip", help="strip prefix from test file paths")

opts = parser.parse_args()

# argparse sometimes forgets to remove this on argparse.REMAINDER args
if opts.server[0] == '--':
opts.server = opts.server[1:]

# The test file is the last argument, but 'server' might contain arbitrary
# amount of options. We cannot express this with argparse, so take it apart
# manually.
opts.test = opts.server[-1]
opts.server = opts.server[:-1]

if opts.strip and opts.test.startswith(opts.strip):
opts.test = opts.test[len(opts.strip):]
parser.add_argument("url", help="url to the test to run")
args = parser.parse_args()

cdp = CDP("C.utf8")

Expand All @@ -62,21 +37,7 @@ except SystemError:
print('1..0 # skip web browser not found')
sys.exit(0)

# pass the address through a separate fd, so that we can see g_debug() messages (which go to stdout)
(addr_r, addr_w) = os.pipe()
env = os.environ.copy()
env["TEST_SERVER_ADDRESS_FD"] = str(addr_w)

server = subprocess.Popen(opts.server,
stdin=subprocess.DEVNULL,
pass_fds=(addr_w,),
close_fds=True,
env=env)
os.close(addr_w)
address = os.read(addr_r, 1000).decode()
os.close(addr_r)

cdp.invoke("Page.navigate", url=address + '/' + opts.test)
cdp.invoke("Page.navigate", url=args.url)

success = True
ignore_resource_errors = False
Expand Down Expand Up @@ -109,9 +70,6 @@ for t, message in cdp.read_log():
else:
print(message, file=sys.stderr)


server.terminate()
server.wait()
cdp.kill()

if not success:
Expand Down
171 changes: 171 additions & 0 deletions test/pytest/mockdbusservice.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
import asyncio
import contextlib
import logging
import math
from collections.abc import AsyncIterator
from typing import Iterator

from cockpit._vendor import systemd_ctypes

logger = logging.getLogger(__name__)


# No introspection, manual handling of method calls
class borkety_Bork(systemd_ctypes.bus.BaseObject):
def message_received(self, message: systemd_ctypes.bus.BusMessage) -> bool:
signature = message.get_signature(True) # noqa:FBT003
body = message.get_body()
logger.debug('got Bork message: %s %r', signature, body)

if message.get_member() == 'Echo':
message.reply_method_return(signature, *body)
return True

return False


class com_redhat_Cockpit_DBusTests_Frobber(systemd_ctypes.bus.Object):
finally_normal_name = systemd_ctypes.bus.Interface.Property('s', 'There aint no place like home')
readonly_property = systemd_ctypes.bus.Interface.Property('s', 'blah')
aay = systemd_ctypes.bus.Interface.Property('aay', [], name='aay')
ag = systemd_ctypes.bus.Interface.Property('ag', [], name='ag')
ao = systemd_ctypes.bus.Interface.Property('ao', [], name='ao')
as_ = systemd_ctypes.bus.Interface.Property('as', [], name='as')
ay = systemd_ctypes.bus.Interface.Property('ay', b'ABCabc\0', name='ay')
b = systemd_ctypes.bus.Interface.Property('b', value=False, name='b')
d = systemd_ctypes.bus.Interface.Property('d', 43, name='d')
g = systemd_ctypes.bus.Interface.Property('g', '', name='g')
i = systemd_ctypes.bus.Interface.Property('i', 0, name='i')
n = systemd_ctypes.bus.Interface.Property('n', 0, name='n')
o = systemd_ctypes.bus.Interface.Property('o', '/', name='o')
q = systemd_ctypes.bus.Interface.Property('q', 0, name='q')
s = systemd_ctypes.bus.Interface.Property('s', '', name='s')
t = systemd_ctypes.bus.Interface.Property('t', 0, name='t')
u = systemd_ctypes.bus.Interface.Property('u', 0, name='u')
x = systemd_ctypes.bus.Interface.Property('x', 0, name='x')
y = systemd_ctypes.bus.Interface.Property('y', 42, name='y')

test_signal = systemd_ctypes.bus.Interface.Signal('i', 'as', 'ao', 'a{s(ii)}')

@systemd_ctypes.bus.Interface.Method('', 'i')
def request_signal_emission(self, which_one: int) -> None:
del which_one

self.test_signal(
43,
['foo', 'frobber'],
['/foo', '/foo/bar'],
{'first': (42, 42), 'second': (43, 43)}
)

@systemd_ctypes.bus.Interface.Method('s', 's')
def hello_world(self, greeting: str) -> str:
return f"Word! You said `{greeting}'. I'm Skeleton, btw!"

@systemd_ctypes.bus.Interface.Method('', '')
async def never_return(self) -> None:
await asyncio.sleep(1000000)

@systemd_ctypes.bus.Interface.Method(
['y', 'b', 'n', 'q', 'i', 'u', 'x', 't', 'd', 's', 'o', 'g', 'ay'],
['y', 'b', 'n', 'q', 'i', 'u', 'x', 't', 'd', 's', 'o', 'g', 'ay']
)
def test_primitive_types(
self,
val_byte, val_boolean,
val_int16, val_uint16, val_int32, val_uint32, val_int64, val_uint64,
val_double,
val_string, val_objpath, val_signature,
val_bytestring
):
return [
val_byte + 10,
not val_boolean,
100 + val_int16,
1000 + val_uint16,
10000 + val_int32,
100000 + val_uint32,
1000000 + val_int64,
10000000 + val_uint64,
val_double / math.pi,
f"Word! You said `{val_string}'. Rock'n'roll!",
f"/modified{val_objpath}",
f"assgit{val_signature}",
b"bytestring!\xff\0"
]

@systemd_ctypes.bus.Interface.Method(
['s'],
["a{ss}", "a{s(ii)}", "(iss)", "as", "ao", "ag", "aay"]
)
def test_non_primitive_types(
self,
dict_s_to_s,
dict_s_to_pairs,
a_struct,
array_of_strings,
array_of_objpaths,
array_of_signatures,
array_of_bytestrings
):
return (
f'{dict_s_to_s}{dict_s_to_pairs}{a_struct}'
f'array_of_strings: [{", ".join(array_of_strings)}] '
f'array_of_objpaths: [{", ".join(array_of_objpaths)}] '
f'array_of_signatures: [signature {", ".join(f"'{sig}'" for sig in array_of_signatures)}] '
f'array_of_bytestrings: [{", ".join(x[:-1].decode() for x in array_of_bytestrings)}] '
)


@contextlib.contextmanager
def mock_service_export(bus: systemd_ctypes.Bus) -> Iterator[None]:
slots = [
bus.add_object('/otree/frobber', com_redhat_Cockpit_DBusTests_Frobber()),
bus.add_object('/otree/different', com_redhat_Cockpit_DBusTests_Frobber()),
bus.add_object('/bork', borkety_Bork())
]

yield

for slot in slots:
slot.cancel()


@contextlib.asynccontextmanager
async def well_known_name(bus: systemd_ctypes.Bus, name: str, flags: int = 0) -> AsyncIterator[None]:
result, = await bus.call_method_async(
'org.freedesktop.DBus', '/org/freedesktop/DBus', 'org.freedesktop.DBus', 'RequestName', 'su', name, flags
)
if result != 1:
raise RuntimeError(f'Cannot register name {name}: {result}')

try:
yield

finally:
result, = await bus.call_method_async(
'org.freedesktop.DBus', '/org/freedesktop/DBus', 'org.freedesktop.DBus', 'ReleaseName', 's', name
)
if result != 1:
raise RuntimeError(f'Cannot release name {name}: {result}')


@contextlib.asynccontextmanager
async def mock_dbus_service_on_user_bus() -> AsyncIterator[None]:
user = systemd_ctypes.Bus.default_user()
async with (
well_known_name(user, 'com.redhat.Cockpit.DBusTests.Test'),
well_known_name(user, 'com.redhat.Cockpit.DBusTests.Second'),
):
with mock_service_export(user):
yield


async def main():
async with mock_dbus_service_on_user_bus():
print('Mock service running. Ctrl+C to exit.')
await asyncio.sleep(2 << 30) # "a long time."


if __name__ == '__main__':
systemd_ctypes.run_async(main())
Loading

0 comments on commit 692597d

Please sign in to comment.