Skip to content

Commit

Permalink
core: AsyncCoreSocket and wrappers
Browse files Browse the repository at this point in the history
A POC implementation
  • Loading branch information
svinota committed Sep 17, 2024
1 parent ecc1bc9 commit e08b54a
Show file tree
Hide file tree
Showing 8 changed files with 385 additions and 218 deletions.
70 changes: 59 additions & 11 deletions pyroute2/iproute/linux.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from pyroute2 import config
from pyroute2.common import basestring
from pyroute2.config import AF_BRIDGE
from pyroute2.lab import LAB_API
from pyroute2.netlink import (
NLM_F_ACK,
NLM_F_ATOMIC,
Expand All @@ -20,6 +19,7 @@
NLM_F_ROOT,
NLMSG_ERROR,
)
from pyroute2.netlink.core import SyncMixin
from pyroute2.netlink.exceptions import (
NetlinkDumpInterrupted,
NetlinkError,
Expand Down Expand Up @@ -106,6 +106,16 @@
log = logging.getLogger(__name__)


def compat_get_dump_filter(kwarg):
if 'match' in kwarg:
return kwarg.pop('match'), kwarg
else:
new_kwarg = {}
if 'family' in kwarg:
new_kwarg['family'] = kwarg.pop('family')
return kwarg, new_kwarg


def get_default_request_filters(mode, command):
filters = {
'link': [LinkFieldFilter(), LinkIPRouteFilter(command)],
Expand Down Expand Up @@ -1355,12 +1365,12 @@ def neigh(self, command, **kwarg):
dump_filter = None
msg = ndmsg.ndmsg()
if command == 'dump':
dump_filter, kwarg = get_dump_filter(kwarg)
dump_filter, kwarg = compat_get_dump_filter(kwarg)

request = (
RequestProcessor(context=kwarg, prime=kwarg)
.apply_filter(NeighbourFieldFilter())
.apply_filter(NeighbourIPRouteFilter(command))
.add_filter(NeighbourFieldFilter())
.add_filter(NeighbourIPRouteFilter(command))
.finalize()
)
msg_type, msg_flags = self.make_request_type(command, command_map)
Expand Down Expand Up @@ -2492,14 +2502,56 @@ class IPBatch(RTNL_API, IPBatchSocket):
pass


class IPRoute(LAB_API, RTNL_API, IPRSocket):
class AsyncIPRoute(RTNL_API, IPRSocket):
'''
Regular ordinary utility class, see RTNL API for the list of methods.
Regular ordinary async utility class, provides RTNL API using
IPRSocket as the transport level.
'''

pass


class IPRoute(SyncMixin, AsyncIPRoute):
'''
A synchronous version of AsyncIPRoute. All the same API, but
sync. Provides a legacy API for the old code that is not using
asyncio.
'''

def dump(self, groups=None):
groups_map = {
RTMGRP_LINK: [partial(self.link, 'dump')],
RTMGRP_IPV4_IFADDR: [partial(self.addr, 'dump', family=AF_INET)],
RTMGRP_IPV4_ROUTE: [partial(self.route, 'dump', family=AF_INET)],
}
for group, methods in groups_map.items():
if group & (groups if groups is not None else self.groups):
for method in methods:
for msg in method():
yield msg

def __getattribute__(self, name):
async_methods = ['addr', 'link', 'route']
symbol = super().__getattribute__(name)

def synchronize(*argv, **kwarg):
async def collect_dump():
return [i async for i in await symbol(*argv, **kwarg)]

async def collect_op():
return await symbol(*argv, **kwarg)

if argv[0] == 'dump':
task = collect_dump
else:
task = collect_op
return self.event_loop.run_until_complete(task())

if name in async_methods:
return synchronize
return symbol


class NetNS(IPRoute):

def __init__(
Expand All @@ -2511,11 +2563,7 @@ def __init__(
groups=RTMGRP_DEFAULTS,
):
super().__init__(
target=netns if netns is not None else target,
netns=netns,
flags=flags,
libc=libc,
groups=groups,
target=target, netns=netns, flags=flags, libc=libc, groups=groups
)


Expand Down
5 changes: 0 additions & 5 deletions pyroute2/ndb/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,6 @@ def add_mock_netns(self, netns):
import ctypes.util
import logging
import logging.handlers
import sys
import threading

from pyroute2 import config
Expand Down Expand Up @@ -511,10 +510,6 @@ def __init__(
'nlm_generator': 1,
}
]
if sys.platform.startswith('linux'):
sources.append(
{'target': self.nsmanager, 'kind': 'nsmanager'}
)
elif not isinstance(sources, (list, tuple)):
raise ValueError('sources format not supported')

Expand Down
8 changes: 4 additions & 4 deletions pyroute2/ndb/objects/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,8 @@ def __init__(
self.load_event.set()
self.load_debug = False
self.lock = threading.Lock()
self.object_data = RequestProcessor(
self.field_filter(), context=weakref.proxy(self)
)
self.object_data = RequestProcessor(context=weakref.proxy(self))
self.object_data.add_filter(self.field_filter())
self.kspec = self.schema.compiled[self.table]['idx']
self.knorm = self.schema.compiled[self.table]['norm_idx']
self.spec = self.schema.compiled[self.table]['all_names']
Expand Down Expand Up @@ -351,7 +350,8 @@ def __init__(
def new_spec(cls, spec, context=None, localhost=None):
if isinstance(spec, Record):
spec = spec._as_dict()
rp = RequestProcessor(cls.field_filter(), context=spec, prime=spec)
rp = RequestProcessor(context=spec, prime=spec)
rp.add_filter(cls.field_filter())
if isinstance(context, dict):
rp.update(context)
if 'target' not in rp and localhost is not None:
Expand Down
1 change: 1 addition & 0 deletions pyroute2/ndb/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ def receiver(self):
while self.state.get() not in ('stop', 'restart'):
try:
msg = tuple(self.nl.get())
self.log.debug(f'received message {msg}')
except Exception as e:
self.errors_counter += 1
self.log.error('source error: %s %s' % (type(e), e))
Expand Down
Loading

0 comments on commit e08b54a

Please sign in to comment.