Skip to content

Core API

SimNOS Class

SimNOS class is a main entry point to interact with SimNOS servers - start, stop, list.

:param inventory: SimNOS inventory dictionary or OS path to .yaml file with inventory data :param plugins: Plugins to add extra devices/commands currently not supported easily.

Sample usage:

from simnos import SimNOS

net = SimNOS()
net.start()
Source code in simnos/core/simnos.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
class SimNOS:
    """
    SimNOS class is a main entry point to interact
    with SimNOS servers - start, stop, list.

    :param inventory: SimNOS inventory dictionary or
                      OS path to .yaml file with inventory data
    :param plugins: Plugins to add extra devices/commands
                    currently not supported easily.

    Sample usage:

    ```python
    from simnos import SimNOS

    net = SimNOS()
    net.start()
    ```
    """

    def __init__(
        self,
        inventory: dict | None = None,
        plugins: list | None = None,
    ) -> None:
        self.inventory: dict = inventory or default_inventory
        self.plugins: list = plugins or []

        self.hosts: dict[str, Host] = {}
        self.allocated_ports: set[int] = set()

        self.shell_plugins = shell_plugins
        self.nos_plugins = nos_plugins
        self.servers_plugins = servers_plugins

        self._load_inventory()
        self._init()
        self._register_nos_plugins()

    def __enter__(self):
        """
        Method to start the SimNOS servers when entering the context manager.
        It is meant to be used with the `with` statement.
        """
        self.start()
        return self

    def __exit__(self, *args):
        """
        Method to stop the SimNOS servers when exiting the context manager.
        It is meant to be used with the `with` statement.
        """
        self.stop()

    def _is_inventory_in_yaml(self) -> bool:
        """method that checks if the inventory is a yaml file."""
        return isinstance(self.inventory, str) and self.inventory.endswith((".yaml", ".yml"))

    def _load_inventory_yaml(self) -> None:
        """Helper method to load SimNOS inventory if it is yaml."""
        with open(self.inventory, encoding="utf-8") as f:
            self.inventory = yaml.safe_load(f.read())

    def _load_inventory(self) -> None:
        """Helper method to load SimNOS inventory"""
        if self._is_inventory_in_yaml():
            self._load_inventory_yaml()

        if isinstance(self.inventory, str):
            raise ValueError(f"Inventory file must end with .yaml or .yml, got '{self.inventory}'")
        if not isinstance(self.inventory, dict):
            raise TypeError(f"Inventory must be a dict or a path to a YAML file, got {type(self.inventory).__name__}")

        self.inventory["default"] = {
            **default_inventory["default"],
            **self.inventory.get("default", {}),
        }
        ModelSimnosInventory(**self.inventory)
        log.debug("SimNOS inventory validation succeeded")

    def _init(self) -> None:
        """
        Helper method to initiate host objects
        and store them in self.hosts, this
        method called automatically on SimNOS object instantiation.
        """
        for host_name, host_config in self.inventory["hosts"].items():
            params = {
                **copy.deepcopy(self.inventory["default"]),
                **copy.deepcopy(host_config),
            }
            port: int | list[int] = params.pop("port")
            replicas: int | None = params.pop("replicas", None)
            self._check_ports_and_replicas(port, replicas)
            self._instantiate_host_object(host_name, port, replicas, params)

    def _check_ports_and_replicas(self, port, replicas):
        """
        Method to check if the port and replicas are valid

        :param port: integer or list of two integers - port to allocate
        :param replicas: integer - number of hosts to create
        """
        if not replicas and isinstance(port, list):
            raise ValueError("If replicas is not set, port must be an integer.")
        if replicas and not isinstance(port, list):
            raise ValueError("If replicas is set, port must be a list of two integers.")
        if replicas and len(port) != 2:
            raise ValueError("If replicas is set, port must be a list of two integers.")
        if replicas and port[0] >= port[1]:
            raise ValueError("If replicas is set, port[0] must be less than port[1].")
        if replicas and replicas < 1:
            raise ValueError("If replicas is set, replicas must be greater than 0.")
        if replicas and port[1] - port[0] + 1 != replicas:
            raise ValueError("If replicas is set, port range must be equal to the number of replicas.")

    def _instantiate_host_object(self, host_name: str, port: int | list[int], replicas: int | None, params: dict):
        """
        Method that instantiate the host objects. It initializes the hosts
        with the corresponding name, port and network operating system

        :param host_name: string - name of the host
        :param port: integer or list of two integers - port to allocate
        :param replicas: integer - number of hosts to create
        :param params: dictionary - parameters to pass to
                                    the host like configurations
        """
        hosts_name, ports = self._get_hosts_and_ports(host_name, port, replicas)
        for h_name, p in zip(hosts_name, ports, strict=True):
            self._instantiate_single_host_object(h_name, p, params)

    def _get_hosts_and_ports(self, host_name: str, port: int | list[int], replicas: int | None = None):
        """
        Method to get hosts and ports correctly
        depending on the number of replicas (if exists).

        :param host_name: string - name of the host
        :param port: integer or list of two integers - port to allocate
        :param replicas: integer - number of hosts to create
        """
        if replicas:
            hosts_name = [f"{host_name}{i}" for i in range(replicas)]
            ports = list(range(port[0], port[1] + 1))
        else:
            hosts_name = [host_name]
            ports = [port]
        return hosts_name, ports

    def _instantiate_single_host_object(self, host: str, port: int, params: dict):
        """
        Method that instantiates a single host object.

        :param host: name of the host
        :param port: port to allocate
        :param params: parameters to pass to the host like configurations
        """
        self._allocate_port(port)
        self.hosts[host] = Host(name=host, port=port, simnos=self, **params)

    def _allocate_port(self, port: int | list[int]) -> None:
        """
        Method to allocate port for host

        :param port: integer or list of two integers -
                     range to allocate port from
        """
        if isinstance(port, int):
            port: list[int] = [port]

        for p in port:
            self._allocate_port_single(p)

    def _allocate_port_single(self, port: int) -> None:
        """
        Method to allocate single port for host.

        :param port: integer - port to allocate
        """
        if not (0 < port <= 65535):
            raise ValueError(f"Port {port} out of valid range (1-65535)")
        if port in self.allocated_ports:
            raise ValueError(f"Port {port} already in use")
        self.allocated_ports.add(port)

    def _get_hosts_as_list(self, hosts: str | list[str] | None = None) -> list[Host]:
        """
        Helper method to get hosts as list

        :param hosts: string or list of strings
        :return: list of Host objects
        """
        if not hosts:
            hosts = list(self.hosts.keys())
        if isinstance(hosts, str):
            hosts = [hosts]
        return [self.hosts[host] for host in hosts]

    def start(
        self,
        hosts: str | list[str] | None = None,
        parallel: bool = False,
        workers: int | None = None,
    ) -> None:
        """
        Function to start NOS servers instances

        :param hosts: single or list of hosts to start by their name.
        :param parallel: if True, start hosts in parallel using threads.
        :param workers: max number of worker threads (default: min(32, host_count)).
        """
        hosts: list[Host] = self._get_hosts_as_list(hosts)
        self._execute_function_over_hosts(
            hosts,
            "start",
            host_running=False,
            parallel=parallel,
            workers=workers,
        )
        log.info(
            "The following devices have been initiated: %s",
            [host.name for host in hosts],
        )
        for host in hosts:
            log.info("Device %s is running on port %s", host.name, host.port)
            self._warn_security(host)

    # Global wall-clock budget for the entire stop() operation.
    # Covers host.stop() calls + safety-net thread join.
    _STOP_GLOBAL_DEADLINE = 60

    def stop(
        self,
        hosts: str | list[str] | None = None,
        parallel: bool = False,
        workers: int | None = None,
    ) -> None:
        """
        Function to stop NOS servers instances and join managed threads.

        Uses a global deadline (_STOP_GLOBAL_DEADLINE seconds) to bound the
        total wall-clock time.  If the deadline is exceeded, remaining hosts
        may be left running and a warning is logged.

        :param hosts: single or list of hosts to stop by their name.
        :param parallel: if True, stop hosts in parallel using threads.
        :param workers: max number of worker threads (default: min(32, host_count)).
        """
        deadline = time.monotonic() + self._STOP_GLOBAL_DEADLINE
        hosts: list[Host] = self._get_hosts_as_list(hosts)
        # Collect managed threads before stopping (Host.stop sets server to None)
        managed_threads = self._collect_server_threads(hosts)
        self._execute_function_over_hosts(
            hosts,
            "stop",
            host_running=True,
            parallel=parallel,
            workers=workers,
            deadline=deadline,
        )
        if managed_threads:
            remaining = max(0, deadline - time.monotonic())
            self._join_threads(managed_threads, timeout=min(self._SAFETY_NET_DEADLINE, remaining))

    def _collect_server_threads(self, hosts: list[Host]) -> list[threading.Thread]:
        """Collect all managed threads from host servers before stopping."""
        threads: list[threading.Thread] = []
        for host in hosts:
            if host.server is not None:
                threads.extend(host.server.managed_threads)
        return threads

    # Safety-net join budget: longer than per-server budget because this
    # covers ALL hosts after they have already been told to stop.
    _SAFETY_NET_DEADLINE = 15
    _SAFETY_NET_PER_THREAD = 5

    def _join_threads(
        self,
        threads: list[threading.Thread],
        timeout: float | None = None,
    ) -> None:
        """
        Join SimNOS-managed threads after all hosts are stopped.
        Server threads are already joined by TCPServerBase.stop();
        this is a safety net for any stragglers.
        """
        total = timeout if timeout is not None else self._SAFETY_NET_DEADLINE
        alive = join_threads_with_deadline(threads, total, self._SAFETY_NET_PER_THREAD)
        if alive:
            log.warning("%d SimNOS thread(s) did not exit within timeout", len(alive))

    def _execute_function_over_hosts(
        self,
        hosts: list[Host],
        func: str,
        host_running: bool = True,
        parallel: bool = False,
        workers: int | None = None,
        deadline: float | None = None,
    ):
        """
        Function that executes a function like start or stop over
        the selected hosts.

        :param hosts: list of Hosts objects in which the function will
        be executed.
        :param parallel: if True, execute in parallel using threads.
        :param workers: max number of worker threads.
        :param deadline: optional monotonic deadline; skip remaining hosts if exceeded.
        """
        for host in hosts:
            if host not in self.hosts.values():
                raise ValueError(f"Host {host} not found")
        targets = [h for h in hosts if h.running == host_running]
        if not parallel or len(targets) <= 1:
            for i, h in enumerate(targets):
                if deadline is not None and time.monotonic() >= deadline:
                    log.warning("Global stop deadline exceeded, %d host(s) not stopped", len(targets) - i)
                    break
                getattr(h, func)()
            return
        if workers is not None and workers < 1:
            raise ValueError(f"workers must be >= 1, got {workers}")
        max_workers = workers or min(32, len(targets))
        remaining = max(0, deadline - time.monotonic()) if deadline is not None else None
        ex = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
        futures = [ex.submit(getattr(h, func)) for h in targets]
        timed_out = False
        try:
            for f in concurrent.futures.as_completed(futures, timeout=remaining):
                f.result()
        except TimeoutError:
            timed_out = True
            log.warning("Global stop deadline exceeded during parallel %s", func)
        finally:
            if timed_out:
                ex.shutdown(wait=False, cancel_futures=True)
            else:
                ex.shutdown(wait=True)

    @staticmethod
    def _warn_security(host: Host) -> None:
        """Emit warnings for common security misconfigurations."""
        if host.username == "user" and host.password == "user":  # noqa: S105
            log.warning(
                "Device %s uses default credentials (user/user). "
                "Change username/password in the inventory for non-local use.",
                host.name,
            )
        address = host.server_inventory.get("configuration", {}).get("address", "")
        if address == "0.0.0.0":  # noqa: S104
            log.warning(
                "Device %s binds to 0.0.0.0 (all interfaces). Use 127.0.0.1 to restrict access to localhost only.",
                host.name,
            )

    def _register_nos_plugins(self) -> None:
        """
        Method to register NOS plugins with SimNOS object, all plugins
        must be registered before calling start method.
        """
        for plugin in self.plugins:
            if isinstance(plugin, Nos):
                nos_instance = plugin
            else:
                nos_instance = Nos()
                if isinstance(plugin, dict):
                    nos_instance.from_dict(plugin)
                elif isinstance(plugin, str):
                    nos_instance.from_file(plugin)
                else:
                    raise TypeError(f"Unsupported NOS type {type(plugin)}, supported str, dict or Nos")
            self.nos_plugins[nos_instance.name] = nos_instance

__enter__()

Method to start the SimNOS servers when entering the context manager. It is meant to be used with the with statement.

Source code in simnos/core/simnos.py
 96
 97
 98
 99
100
101
102
def __enter__(self):
    """
    Method to start the SimNOS servers when entering the context manager.
    It is meant to be used with the `with` statement.
    """
    self.start()
    return self

__exit__(*args)

Method to stop the SimNOS servers when exiting the context manager. It is meant to be used with the with statement.

Source code in simnos/core/simnos.py
104
105
106
107
108
109
def __exit__(self, *args):
    """
    Method to stop the SimNOS servers when exiting the context manager.
    It is meant to be used with the `with` statement.
    """
    self.stop()

start(hosts=None, parallel=False, workers=None)

Function to start NOS servers instances

:param hosts: single or list of hosts to start by their name. :param parallel: if True, start hosts in parallel using threads. :param workers: max number of worker threads (default: min(32, host_count)).

Source code in simnos/core/simnos.py
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
def start(
    self,
    hosts: str | list[str] | None = None,
    parallel: bool = False,
    workers: int | None = None,
) -> None:
    """
    Function to start NOS servers instances

    :param hosts: single or list of hosts to start by their name.
    :param parallel: if True, start hosts in parallel using threads.
    :param workers: max number of worker threads (default: min(32, host_count)).
    """
    hosts: list[Host] = self._get_hosts_as_list(hosts)
    self._execute_function_over_hosts(
        hosts,
        "start",
        host_running=False,
        parallel=parallel,
        workers=workers,
    )
    log.info(
        "The following devices have been initiated: %s",
        [host.name for host in hosts],
    )
    for host in hosts:
        log.info("Device %s is running on port %s", host.name, host.port)
        self._warn_security(host)

stop(hosts=None, parallel=False, workers=None)

Function to stop NOS servers instances and join managed threads.

Uses a global deadline (_STOP_GLOBAL_DEADLINE seconds) to bound the total wall-clock time. If the deadline is exceeded, remaining hosts may be left running and a warning is logged.

:param hosts: single or list of hosts to stop by their name. :param parallel: if True, stop hosts in parallel using threads. :param workers: max number of worker threads (default: min(32, host_count)).

Source code in simnos/core/simnos.py
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
def stop(
    self,
    hosts: str | list[str] | None = None,
    parallel: bool = False,
    workers: int | None = None,
) -> None:
    """
    Function to stop NOS servers instances and join managed threads.

    Uses a global deadline (_STOP_GLOBAL_DEADLINE seconds) to bound the
    total wall-clock time.  If the deadline is exceeded, remaining hosts
    may be left running and a warning is logged.

    :param hosts: single or list of hosts to stop by their name.
    :param parallel: if True, stop hosts in parallel using threads.
    :param workers: max number of worker threads (default: min(32, host_count)).
    """
    deadline = time.monotonic() + self._STOP_GLOBAL_DEADLINE
    hosts: list[Host] = self._get_hosts_as_list(hosts)
    # Collect managed threads before stopping (Host.stop sets server to None)
    managed_threads = self._collect_server_threads(hosts)
    self._execute_function_over_hosts(
        hosts,
        "stop",
        host_running=True,
        parallel=parallel,
        workers=workers,
        deadline=deadline,
    )
    if managed_threads:
        remaining = max(0, deadline - time.monotonic())
        self._join_threads(managed_threads, timeout=min(self._SAFETY_NET_DEADLINE, remaining))

Host Class

Host class to build host instances to use with SIMNOS.

Source code in simnos/core/host.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
class Host:
    """
    Host class to build host instances to use with SIMNOS.
    """

    def __init__(
        self,
        name: str,
        username: str,
        password: str,
        port: int,
        server: dict,
        shell: dict,
        nos: dict,
        simnos,
        platform: str | None = None,
        configuration_file: str | None = None,
    ) -> None:
        self.name: str = name
        self.server_inventory: dict = server
        self.shell_inventory: dict = shell
        self.nos_inventory: dict = nos
        self.username: str = username
        self.password: str = password
        self.port: int = port
        self.simnos = simnos  # SimNOS object
        self.shell_inventory["configuration"].setdefault("base_prompt", self.name)
        self.running = False
        self.server = None
        self.server_plugin = None
        self.shell_plugin = None
        self.nos_plugin = None
        self.nos = None
        self.platform: str | None = platform
        self.configuration_file: str | None = configuration_file

        if self.platform:
            self.nos_inventory["plugin"] = self.platform

        self._validate()

    def start(self):
        """Method to start server instance for this host."""
        self.server_plugin = self.simnos.servers_plugins[self.server_inventory["plugin"]]
        self.shell_plugin = self.simnos.shell_plugins[self.shell_inventory["plugin"]]
        if self.platform:
            self.nos_inventory["plugin"] = self.platform
        self.nos_plugin = self.simnos.nos_plugins.get(self.nos_inventory["plugin"], self.nos_inventory["plugin"])
        self.nos = (
            Nos(filename=self.nos_plugin, configuration_file=self.configuration_file)
            if not isinstance(self.nos_plugin, Nos)
            else self.nos_plugin
        )
        self.server = self.server_plugin(
            shell=self.shell_plugin,
            shell_configuration=self.shell_inventory["configuration"],
            nos=self.nos,
            nos_inventory_config=self.nos_inventory.get("configuration", {}),
            port=self.port,
            username=self.username,
            password=self.password,
            **self.server_inventory["configuration"],
        )
        self.server.start()
        self.running = True

    def stop(self):
        """Method to stop server instance for this host."""
        self.server.stop()
        self.server = None
        self.running = False

    def _validate(self):
        """Validate that the host has the required attributes using pydantic"""
        if self.platform:
            self._check_if_platform_is_supported(self.platform)
        ModelHost(**self.__dict__)

    def _check_if_platform_is_supported(self, platform: str):
        """Check if the platform is supported"""
        if platform not in available_platforms:
            msg = f"Platform {platform} is not supported by SIMNOS. Supported platforms are: {available_platforms}"
            raise ValueError(msg)

start()

Method to start server instance for this host.

Source code in simnos/core/host.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def start(self):
    """Method to start server instance for this host."""
    self.server_plugin = self.simnos.servers_plugins[self.server_inventory["plugin"]]
    self.shell_plugin = self.simnos.shell_plugins[self.shell_inventory["plugin"]]
    if self.platform:
        self.nos_inventory["plugin"] = self.platform
    self.nos_plugin = self.simnos.nos_plugins.get(self.nos_inventory["plugin"], self.nos_inventory["plugin"])
    self.nos = (
        Nos(filename=self.nos_plugin, configuration_file=self.configuration_file)
        if not isinstance(self.nos_plugin, Nos)
        else self.nos_plugin
    )
    self.server = self.server_plugin(
        shell=self.shell_plugin,
        shell_configuration=self.shell_inventory["configuration"],
        nos=self.nos,
        nos_inventory_config=self.nos_inventory.get("configuration", {}),
        port=self.port,
        username=self.username,
        password=self.password,
        **self.server_inventory["configuration"],
    )
    self.server.start()
    self.running = True

stop()

Method to stop server instance for this host.

Source code in simnos/core/host.py
81
82
83
84
85
def stop(self):
    """Method to stop server instance for this host."""
    self.server.stop()
    self.server = None
    self.running = False

Nos Class

Base class to build NOS plugins instances to use with SIMNOS.

Source code in simnos/core/nos.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
class Nos:
    """
    Base class to build NOS plugins instances to use with SIMNOS.
    """

    def __init__(
        self,
        name: str = "SimNOS",
        commands: dict | None = None,
        initial_prompt: str = "SimNOS>",
        filename: str | list[str] | None = None,
        configuration_file: str | None = None,
        dict_args: dict | None = None,
    ) -> None:
        """
        Method to instantiate Nos Instance

        :param name: NOS plugin name
        :param commands: dictionary of NOS commands
        :param initial_prompt: NOS initial prompt
        """
        self.name = name
        self.commands = commands or {}
        self.initial_prompt = initial_prompt
        self.auth: str | None = None
        self.enable_prompt: str | None = None
        self.config_prompt: str | None = None
        self.device = None
        self.configuration_file = configuration_file
        if isinstance(filename, str):
            self.from_file(filename)
        elif isinstance(filename, list):
            for file in filename:
                self.from_file(file)
        elif dict_args:
            self.from_dict(dict_args)

        self.validate()

    def validate(self) -> None:
        """
        Method to validate NOS attributes: commands, name,
        initial prompt - using Pydantic models,
        raises ValidationError on failure.
        """
        ModelNosAttributes(**self.__dict__)
        log.debug("%s NOS attributes validation succeeded", self.name)

    def from_dict(self, data: dict) -> None:
        """
        Method to build NOS from dictionary data.

        Sample NOS dictionary::

            nos_plugin_dict = {
                "name": "MySimNOSPlugin",
                "initial_prompt": "{base_prompt}>",
                "commands": {
                    "terminal width 511": {
                        "output": "",
                        "help": "Set terminal width to 511",
                        "prompt": "{base_prompt}>",
                    },
                    "terminal length 0": {
                        "output": "",
                        "help": "Set terminal length to 0",
                        "prompt": "{base_prompt}>",
                    },
                    "show clock": {
                        "output": "MySimNOSPlugin system time is 00:00:00",
                        "help": "Show system time",
                        "prompt": "{base_prompt}>",
                    },
                },
            }

        :param data: NOS dictionary
        """
        self.name = data.get("name", self.name)
        self.commands.update(data.get("commands", self.commands))
        self.initial_prompt = data.get("initial_prompt", self.initial_prompt)
        self.auth = data.get("auth", self.auth)
        self.enable_prompt = data.get("enable_prompt", self.enable_prompt)
        self.config_prompt = data.get("config_prompt", self.config_prompt)

    def _from_yaml(self, filepath: str) -> None:
        """
        Method to build NOS from YAML file.

        Sample NOS YAML file content::

            name: "MySimNOSPlugin"
            initial_prompt: "{base_prompt}>"
            commands:
                terminal width 511: {
                    "output": "",
                    "help": "Set terminal width to 511",
                    "prompt": "{base_prompt}>",
                }
                terminal length 0: {
                    "output": "",
                    "help": "Set terminal length to 0",
                    "prompt": "{base_prompt}>",
                }
                show clock: {
                    "output": "MySimNOSPlugin system time is 00:00:00",
                    "help": "Show system time",
                    "prompt": "{base_prompt}>",
                }

        :param filepath: OS path to YAML file with NOS data
        """
        with open(filepath, encoding="utf-8") as f:
            self.from_dict(yaml.safe_load(f))

    def _from_module(self, filename: str) -> None:
        """
        Method to import NOS data from python file or python module.

        Loads from the .py file using the recipe:
        https://docs.python.org/3/library/importlib.html#importing-a-source-file-directly

        Sample Python NOS plugin file::

            name = "MySimNOSPlugin"

            INITIAL_PROMPT = "{base_prompt}>"

            commands = {
                "terminal width 511": {
                    "output": "",
                    "help": "Set terminal width to 511",
                    "prompt": "{base_prompt}>",
                },
                "terminal length 0": {
                    "output": "",
                    "help": "Set terminal length to 0",
                    "prompt": "{base_prompt}>",
                },
                "show clock": {
                    "output": "MySimNOSPlugin system time is 00:00:00",
                    "help": "Show system time",
                    "prompt": "{base_prompt}>",
                },
            }

        :param filename: OS path string to Python .py file
        """
        spec = importlib.util.spec_from_file_location("module.name", filename)
        module = importlib.util.module_from_spec(spec)
        spec.loader.exec_module(module)
        self.name = getattr(module, "NAME", self.name)
        self.commands.update(getattr(module, "commands", self.commands))
        self.initial_prompt = getattr(module, "INITIAL_PROMPT", self.initial_prompt)
        self.auth = getattr(module, "AUTH", self.auth)
        self.enable_prompt = getattr(module, "ENABLE_PROMPT", self.enable_prompt)
        self.config_prompt = getattr(module, "CONFIG_PROMPT", self.config_prompt)
        classname = getattr(module, "DEVICE_NAME", None)
        if classname is None:
            log.warning("Module '%s' does not define DEVICE_NAME; device will be None", filename)
        else:
            device_class = getattr(module, classname, None)
            if device_class is None:
                raise AttributeError(
                    f"Module '{filename}' defines DEVICE_NAME='{classname}' but class '{classname}' was not found"
                )
            configuration_file = self.configuration_file or getattr(module, "DEFAULT_CONFIGURATION", None)
            self.device = device_class(configuration_file=configuration_file)

    def from_file(self, filename: str) -> None:
        """
        Method to load NOS from YAML or Python file

        :param filename: OS path string to `.yaml/.yml` or `.py` file with NOS data
        """
        if not self.is_file_ending_correct(filename):
            raise ValueError(f'Unsupported "{filename}" file extension. Supported: .py, .yml, .yaml')
        if not os.path.isfile(filename):
            raise FileNotFoundError(filename)
        if filename.endswith((".yaml", ".yml")):
            self._from_yaml(filename)
        elif filename.endswith(".py"):
            self._from_module(filename)

    def is_file_ending_correct(self, filename: str) -> bool:
        """
        Method to check if file extension is supported.
        Supported types are: .yaml, .yml and .py
        """
        return filename.endswith((".yaml", ".yml", ".py"))

__init__(name='SimNOS', commands=None, initial_prompt='SimNOS>', filename=None, configuration_file=None, dict_args=None)

Method to instantiate Nos Instance

:param name: NOS plugin name :param commands: dictionary of NOS commands :param initial_prompt: NOS initial prompt

Source code in simnos/core/nos.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def __init__(
    self,
    name: str = "SimNOS",
    commands: dict | None = None,
    initial_prompt: str = "SimNOS>",
    filename: str | list[str] | None = None,
    configuration_file: str | None = None,
    dict_args: dict | None = None,
) -> None:
    """
    Method to instantiate Nos Instance

    :param name: NOS plugin name
    :param commands: dictionary of NOS commands
    :param initial_prompt: NOS initial prompt
    """
    self.name = name
    self.commands = commands or {}
    self.initial_prompt = initial_prompt
    self.auth: str | None = None
    self.enable_prompt: str | None = None
    self.config_prompt: str | None = None
    self.device = None
    self.configuration_file = configuration_file
    if isinstance(filename, str):
        self.from_file(filename)
    elif isinstance(filename, list):
        for file in filename:
            self.from_file(file)
    elif dict_args:
        self.from_dict(dict_args)

    self.validate()

from_dict(data)

Method to build NOS from dictionary data.

Sample NOS dictionary::

nos_plugin_dict = {
    "name": "MySimNOSPlugin",
    "initial_prompt": "{base_prompt}>",
    "commands": {
        "terminal width 511": {
            "output": "",
            "help": "Set terminal width to 511",
            "prompt": "{base_prompt}>",
        },
        "terminal length 0": {
            "output": "",
            "help": "Set terminal length to 0",
            "prompt": "{base_prompt}>",
        },
        "show clock": {
            "output": "MySimNOSPlugin system time is 00:00:00",
            "help": "Show system time",
            "prompt": "{base_prompt}>",
        },
    },
}

:param data: NOS dictionary

Source code in simnos/core/nos.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
def from_dict(self, data: dict) -> None:
    """
    Method to build NOS from dictionary data.

    Sample NOS dictionary::

        nos_plugin_dict = {
            "name": "MySimNOSPlugin",
            "initial_prompt": "{base_prompt}>",
            "commands": {
                "terminal width 511": {
                    "output": "",
                    "help": "Set terminal width to 511",
                    "prompt": "{base_prompt}>",
                },
                "terminal length 0": {
                    "output": "",
                    "help": "Set terminal length to 0",
                    "prompt": "{base_prompt}>",
                },
                "show clock": {
                    "output": "MySimNOSPlugin system time is 00:00:00",
                    "help": "Show system time",
                    "prompt": "{base_prompt}>",
                },
            },
        }

    :param data: NOS dictionary
    """
    self.name = data.get("name", self.name)
    self.commands.update(data.get("commands", self.commands))
    self.initial_prompt = data.get("initial_prompt", self.initial_prompt)
    self.auth = data.get("auth", self.auth)
    self.enable_prompt = data.get("enable_prompt", self.enable_prompt)
    self.config_prompt = data.get("config_prompt", self.config_prompt)

from_file(filename)

Method to load NOS from YAML or Python file

:param filename: OS path string to .yaml/.yml or .py file with NOS data

Source code in simnos/core/nos.py
238
239
240
241
242
243
244
245
246
247
248
249
250
251
def from_file(self, filename: str) -> None:
    """
    Method to load NOS from YAML or Python file

    :param filename: OS path string to `.yaml/.yml` or `.py` file with NOS data
    """
    if not self.is_file_ending_correct(filename):
        raise ValueError(f'Unsupported "{filename}" file extension. Supported: .py, .yml, .yaml')
    if not os.path.isfile(filename):
        raise FileNotFoundError(filename)
    if filename.endswith((".yaml", ".yml")):
        self._from_yaml(filename)
    elif filename.endswith(".py"):
        self._from_module(filename)

is_file_ending_correct(filename)

Method to check if file extension is supported. Supported types are: .yaml, .yml and .py

Source code in simnos/core/nos.py
253
254
255
256
257
258
def is_file_ending_correct(self, filename: str) -> bool:
    """
    Method to check if file extension is supported.
    Supported types are: .yaml, .yml and .py
    """
    return filename.endswith((".yaml", ".yml", ".py"))

validate()

Method to validate NOS attributes: commands, name, initial prompt - using Pydantic models, raises ValidationError on failure.

Source code in simnos/core/nos.py
108
109
110
111
112
113
114
115
def validate(self) -> None:
    """
    Method to validate NOS attributes: commands, name,
    initial prompt - using Pydantic models,
    raises ValidationError on failure.
    """
    ModelNosAttributes(**self.__dict__)
    log.debug("%s NOS attributes validation succeeded", self.name)

TCPServerBase Class

Bases: ABC

Base class for a TCP Server. It provides the methods to start and stop the server.

Source code in simnos/core/servers.py
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
class TCPServerBase(ABC):
    """
    Base class for a TCP Server.
    It provides the methods to start and stop the server.
    """

    def __init__(self, address="localhost", port=6000, timeout=1):
        """
        Initialize the server with the address and port
        and the safety-net timeout for select() (seconds).
        """
        self.address = address
        self.port = port
        self.timeout = timeout
        self._is_running = threading.Event()
        self._socket = None
        self.client_shell = None
        self._listen_thread = None
        self._connection_threads = []
        self._wakeup_r: socket.socket | None = None
        self._wakeup_w: socket.socket | None = None
        self._selector: selectors.BaseSelector | None = None

    def start(self):
        """
        Start Server which distributes the connections.
        It handles the creation of the socket, binding to the address and port,
        and starting the listening thread.
        """
        if self._is_running.is_set():
            return

        self._is_running.set()
        try:
            self._bind_sockets()
            self._socket.listen()

            self._wakeup_r, self._wakeup_w = socket.socketpair()
            self._wakeup_r.setblocking(False)

            self._selector = selectors.DefaultSelector()
            self._selector.register(self._socket, selectors.EVENT_READ, data="listen")
            self._selector.register(self._wakeup_r, selectors.EVENT_READ, data="wakeup")

            self._listen_thread = threading.Thread(target=self._listen)
            self._listen_thread.start()
        except Exception:
            self._cleanup_resources()
            self._is_running.clear()
            raise

    def _bind_sockets(self):
        """
        It binds the sockets to the corresponding IPs and Ports.
        In Linux and OSX it reuses the port if needed but
        not in Windows
        """
        self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)

        if sys.platform in ["linux"]:
            self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, True)

        self._socket.setblocking(False)
        self._socket.bind((self.address, self.port))

    @property
    def managed_threads(self) -> list[threading.Thread]:
        """Return all threads managed by this server (listen + connections)."""
        threads = list(self._connection_threads)
        if self._listen_thread is not None:
            threads.append(self._listen_thread)
        return threads

    def stop(self):
        """
        It stops the server joining the threads
        and closing the corresponding sockets.
        """
        if not self._is_running.is_set():
            return

        self._is_running.clear()

        if self._wakeup_w is not None:
            with contextlib.suppress(OSError):
                self._wakeup_w.send(b"\x00")

        self._listen_thread.join(timeout=2)

        try:
            alive = join_threads_with_deadline(self._connection_threads, _STOP_DEADLINE, _PER_THREAD_JOIN)
            if alive:
                log.warning(
                    "%d connection thread(s) did not exit within %ds",
                    len(alive),
                    _STOP_DEADLINE,
                )
        finally:
            self._cleanup_resources()

    def _cleanup_resources(self):
        """Safely close selector, wakeup socketpair, and listen socket."""
        if self._selector is not None:
            with contextlib.suppress(Exception):
                self._selector.close()
            self._selector = None
        if self._socket is not None:
            with contextlib.suppress(OSError):
                self._socket.close()
            self._socket = None
        for sock in (self._wakeup_r, self._wakeup_w):
            if sock is not None:
                with contextlib.suppress(OSError):
                    sock.close()
        self._wakeup_r = self._wakeup_w = None

    def _listen(self):
        """
        Wait for connections using selectors.
        A wakeup socketpair allows stop() to unblock select() instantly
        instead of waiting for the timeout to expire.
        """
        while self._is_running.is_set():
            try:
                # select(timeout) is a safety net. Normal shutdown is
                # signalled via the wakeup socket and returns immediately.
                events = self._selector.select(timeout=self.timeout)
            except (OSError, ValueError):
                break  # selector was closed

            # Check wakeup first: if shutdown and accept fire simultaneously,
            # skip accept and exit immediately.
            for key, _ in events:
                if key.data == "wakeup":
                    return

            # No wakeup — process accept events.
            for key, _ in events:
                if key.data == "listen":
                    try:
                        client, _ = self._socket.accept()
                    except BlockingIOError:
                        continue  # spurious wakeup
                    except OSError:
                        break  # socket was closed (shutdown path)

                    # listen socket is non-blocking, so accepted client
                    # inherits that mode (OS-dependent). Restore blocking
                    # before handing to connection_function (e.g. paramiko).
                    client.setblocking(True)

                    connection_thread = threading.Thread(
                        target=self.connection_function,
                        args=(client, self._is_running),
                    )
                    connection_thread.start()
                    self._connection_threads.append(connection_thread)

            # Prune finished threads to prevent unbounded growth
            self._connection_threads = [t for t in self._connection_threads if t.is_alive()]

    @abstractmethod
    def connection_function(self, client, is_running):
        """
        This abstract method is called when a new connection
        is made. The implementation should handle the
        connection afterwards.
        """

managed_threads property

Return all threads managed by this server (listen + connections).

__init__(address='localhost', port=6000, timeout=1)

Initialize the server with the address and port and the safety-net timeout for select() (seconds).

Source code in simnos/core/servers.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def __init__(self, address="localhost", port=6000, timeout=1):
    """
    Initialize the server with the address and port
    and the safety-net timeout for select() (seconds).
    """
    self.address = address
    self.port = port
    self.timeout = timeout
    self._is_running = threading.Event()
    self._socket = None
    self.client_shell = None
    self._listen_thread = None
    self._connection_threads = []
    self._wakeup_r: socket.socket | None = None
    self._wakeup_w: socket.socket | None = None
    self._selector: selectors.BaseSelector | None = None

connection_function(client, is_running) abstractmethod

This abstract method is called when a new connection is made. The implementation should handle the connection afterwards.

Source code in simnos/core/servers.py
213
214
215
216
217
218
219
@abstractmethod
def connection_function(self, client, is_running):
    """
    This abstract method is called when a new connection
    is made. The implementation should handle the
    connection afterwards.
    """

start()

Start Server which distributes the connections. It handles the creation of the socket, binding to the address and port, and starting the listening thread.

Source code in simnos/core/servers.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def start(self):
    """
    Start Server which distributes the connections.
    It handles the creation of the socket, binding to the address and port,
    and starting the listening thread.
    """
    if self._is_running.is_set():
        return

    self._is_running.set()
    try:
        self._bind_sockets()
        self._socket.listen()

        self._wakeup_r, self._wakeup_w = socket.socketpair()
        self._wakeup_r.setblocking(False)

        self._selector = selectors.DefaultSelector()
        self._selector.register(self._socket, selectors.EVENT_READ, data="listen")
        self._selector.register(self._wakeup_r, selectors.EVENT_READ, data="wakeup")

        self._listen_thread = threading.Thread(target=self._listen)
        self._listen_thread.start()
    except Exception:
        self._cleanup_resources()
        self._is_running.clear()
        raise

stop()

It stops the server joining the threads and closing the corresponding sockets.

Source code in simnos/core/servers.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def stop(self):
    """
    It stops the server joining the threads
    and closing the corresponding sockets.
    """
    if not self._is_running.is_set():
        return

    self._is_running.clear()

    if self._wakeup_w is not None:
        with contextlib.suppress(OSError):
            self._wakeup_w.send(b"\x00")

    self._listen_thread.join(timeout=2)

    try:
        alive = join_threads_with_deadline(self._connection_threads, _STOP_DEADLINE, _PER_THREAD_JOIN)
        if alive:
            log.warning(
                "%d connection thread(s) did not exit within %ds",
                len(alive),
                _STOP_DEADLINE,
            )
    finally:
        self._cleanup_resources()