Skip to content

Built-in Plugins API

NOS Plugins

NOS plugins are at the heart of SIMNOS, they are what enables to realize its full potential.

Cisco IOS

NOS module for Cisco IOS

CiscoIOS

Bases: BaseDevice

Class that keeps track of the state of the Cisco IOS device.

Source code in simnos/plugins/nos/platforms_py/cisco_ios.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class CiscoIOS(BaseDevice):
    """
    Class that keeps track of the state of the Cisco IOS device.
    """

    def make_show_clock(self, base_prompt, current_prompt, command):
        "Return String in format '*11:54:03.018 UTC Sat Apr 16 2022'"
        return time.strftime("*%H:%M:%S.000 %Z %a %b %d %Y")

    def make_show_running_config(self, base_prompt, current_prompt, command):
        "Return String of running configuration"
        return self.render("cisco_ios/show_running-config.j2", base_prompt=base_prompt)

    def make_show_version(self, base_prompt, current_prompt, command):
        "Return String of system hardware and software status"
        return self.render("cisco_ios/show_version.j2", base_prompt=base_prompt)

make_show_clock(base_prompt, current_prompt, command)

Return String in format '*11:54:03.018 UTC Sat Apr 16 2022'

Source code in simnos/plugins/nos/platforms_py/cisco_ios.py
25
26
27
def make_show_clock(self, base_prompt, current_prompt, command):
    "Return String in format '*11:54:03.018 UTC Sat Apr 16 2022'"
    return time.strftime("*%H:%M:%S.000 %Z %a %b %d %Y")

make_show_running_config(base_prompt, current_prompt, command)

Return String of running configuration

Source code in simnos/plugins/nos/platforms_py/cisco_ios.py
29
30
31
def make_show_running_config(self, base_prompt, current_prompt, command):
    "Return String of running configuration"
    return self.render("cisco_ios/show_running-config.j2", base_prompt=base_prompt)

make_show_version(base_prompt, current_prompt, command)

Return String of system hardware and software status

Source code in simnos/plugins/nos/platforms_py/cisco_ios.py
33
34
35
def make_show_version(self, base_prompt, current_prompt, command):
    "Return String of system hardware and software status"
    return self.render("cisco_ios/show_version.j2", base_prompt=base_prompt)

Servers Plugins

Servers Plugins acts as an access layer, simulating device connections.

ParamikoSshServer

Bases: TCPServerBase

Class to implement an SSH server using paramiko as the SSH connection library.

Source code in simnos/plugins/servers/ssh_server_paramiko.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
class ParamikoSshServer(TCPServerBase):
    """
    Class to implement an SSH server using paramiko
    as the SSH connection library.
    """

    _moduli_loaded: bool | None = None
    _default_key: paramiko.rsakey.RSAKey | None = None
    _default_key_lock: threading.Lock = threading.Lock()
    _KNOWN_KEY_TYPES = (
        "ssh-rsa",
        "ssh-ed25519",
        "ssh-dss",
        "ecdsa-sha2-",
        "sk-ssh-ed25519",
        "sk-ecdsa-sha2-",
    )

    def __init__(
        self,
        shell: type,
        nos: Nos,
        nos_inventory_config: dict,
        port: int,
        username: str,
        password: str,
        ssh_key_file: str | None = None,
        ssh_key_file_password: str | None = None,
        ssh_banner: str = "SIMNOS Paramiko SSH Server",
        shell_configuration: dict | None = None,
        address: str = "127.0.0.1",
        timeout: int = 1,
        watchdog_interval: float = 1,
        authorized_keys: str | None = None,
    ):
        super().__init__(address=address, port=port, timeout=timeout)

        self.nos: Nos = nos
        self.nos_inventory_config: dict = nos_inventory_config
        self.shell: type = shell
        self.shell_configuration: dict = shell_configuration or {}
        self.ssh_banner: str = ssh_banner
        self.username: str = username
        self.password: str = password
        self.watchdog_interval: float = watchdog_interval
        self._authorized_keys = self._load_authorized_keys(authorized_keys) if authorized_keys else None

        if ssh_key_file:
            self._ssh_server_key: paramiko.rsakey.RSAKey = paramiko.RSAKey.from_private_key_file(
                ssh_key_file, ssh_key_file_password
            )
        else:
            with ParamikoSshServer._default_key_lock:
                if ParamikoSshServer._default_key is None:
                    ParamikoSshServer._default_key = paramiko.RSAKey.generate(2048)
            self._ssh_server_key = ParamikoSshServer._default_key
            log.warning(
                "Using auto-generated SSH host key. This key is not persisted and "
                "will change on restart. Provide a custom key via ssh_key_file "
                "for non-local use."
            )

        # Load SSH moduli once for DH Group Exchange support in server mode.
        # Result is cached at the class level so subsequent instances skip the file I/O.
        if ParamikoSshServer._moduli_loaded is None:
            ParamikoSshServer._moduli_loaded = paramiko.Transport.load_server_moduli()

    @staticmethod
    def _load_authorized_keys(path: str) -> set[tuple[str, str]]:
        """Parse an OpenSSH authorized_keys file.

        Supports bare key lines and lines with leading options.
        Skips comment lines, blank lines, and @marker lines.
        File not found / permission errors propagate as-is (fail-fast).

        Returns a set of (key_type, base64_data) tuples.
        """
        keys: set[tuple[str, str]] = set()
        with open(path) as fh:
            for line in fh:
                line = line.strip()
                if not line or line.startswith("#"):
                    continue
                if line.startswith("@"):
                    log.warning("Skipping unsupported marker line: %s", line)
                    continue
                parts = line.split()
                for i, part in enumerate(parts):
                    if any(part.startswith(prefix) for prefix in ParamikoSshServer._KNOWN_KEY_TYPES):
                        if i + 1 < len(parts):
                            keys.add((part, parts[i + 1]))
                        else:
                            log.warning("Key type found but base64 data missing, skipping line: %s", line)
                        break
                else:
                    log.warning("No known key type found, skipping line: %s", line)
        return keys

    def watchdog(
        self,
        is_running: threading.Event,
        run_srv: threading.Event,
        session: paramiko.Transport,
        shell: Any,
    ):
        """
        Method to monitor server liveness and recover where possible.
        """
        while run_srv.is_set():
            if not session.is_alive():
                log.warning("ParamikoSshServer.watchdog - session not alive, stopping shell")
                shell.stop()
                break

            if not is_running.is_set():
                shell.stop()

            time.sleep(self.watchdog_interval)

    def _read_channel_line(self, channel, echo: bool = True) -> str:
        """
        Read a single line from the channel byte-by-byte.

        :param channel: paramiko Channel
        :param echo: if True, echo each byte back to the client
        :return: the line read (without trailing CR/LF)
        """
        channel.settimeout(self.timeout)
        buf = b""
        while True:
            try:
                byte = channel.recv(1)
            except TimeoutError:
                continue
            if byte in (b"", None):
                break
            if byte in (b"\r", b"\n"):
                if echo:
                    channel.sendall(b"\r\n")
                break
            if echo:
                channel.sendall(byte)
            buf += byte
        return buf.decode("utf-8", errors="replace")

    def _channel_login(self, channel) -> bool:
        """
        Perform channel-level login for auth_none platforms (e.g. Dell PowerConnect).

        Sends "User Name:" / "Password:" prompts and validates credentials.

        :param channel: paramiko Channel
        :return: True if login succeeded, False otherwise
        """
        channel.sendall(b"\r\nUser Name:")
        username = self._read_channel_line(channel, echo=True)
        channel.sendall(b"\r\nPassword:")
        password = self._read_channel_line(channel, echo=False)
        channel.sendall(b"\r\n")

        if username == self.username and password == self.password:
            log.debug("Channel login succeeded for user %s", username)
            return True

        log.warning("Channel login failed for user %s", username)
        return False

    def connection_function(self, client: socket.socket, is_running: threading.Event):
        shell_replied_event = threading.Event()
        run_srv = threading.Event()
        run_srv.set()

        # determine if this NOS requires auth_none
        allow_auth_none = getattr(self.nos, "auth", None) == "none"

        # create the SSH transport object
        session = paramiko.Transport(client)
        if not self._moduli_loaded:
            session.disabled_algorithms = _DISABLED_GEX_ALGORITHMS
        session.add_server_key(self._ssh_server_key)

        # create the server
        server = ParamikoSshServerInterface(
            ssh_banner=self.ssh_banner,
            username=self.username,
            password=self.password,
            allow_auth_none=allow_auth_none,
            authorized_keys=self._authorized_keys,
        )

        # start the SSH server
        session.start_server(server=server)

        # create the channel and get the stdio
        channel = session.accept()
        if channel is None:
            log.warning("session.accept() returned None, closing transport")
            session.close()
            return

        # For auth_none platforms (e.g. Dell PowerConnect), perform channel-level
        # login before starting the shell.  When publickey auth is also configured,
        # clients that authenticate via publickey bypass this channel-level login
        # intentionally — SSH-level publickey auth already verified the identity.
        if server.auth_method_used == "none" and not self._channel_login(channel):
            log.warning("Channel login failed, closing connection")
            channel.close()
            session.close()
            return

        channel_stdio = channel.makefile("rw")

        # create stdio for the shell
        shell_stdin, shell_stdout = TapIO(run_srv), TapIO(run_srv)

        # start intermediate thread to tap into
        # the channel_stdio->shell_stdin bytes stream
        channel_to_shell_tapper = threading.Thread(
            target=channel_to_shell_tap,
            args=(channel_stdio, shell_stdin, shell_replied_event, run_srv),
        )
        channel_to_shell_tapper.start()

        # start intermediate thread to tap into
        # the shell_stdout->channel_stdio bytes stream
        shell_to_channel_tapper = threading.Thread(
            target=shell_to_channel_tap,
            args=(channel_stdio, shell_stdout, shell_replied_event, run_srv),
        )
        shell_to_channel_tapper.start()

        # create the client shell
        client_shell = self.shell(
            stdin=shell_stdin,
            stdout=shell_stdout,
            nos=self.nos,
            nos_inventory_config=self.nos_inventory_config,
            is_running=is_running,
            **self.shell_configuration,
        )

        # start watchdog thread
        watchdog_thread = threading.Thread(target=self.watchdog, args=(is_running, run_srv, session, client_shell))
        watchdog_thread.start()

        # running this command will block this function until shell exits
        client_shell.start()
        log.debug("ParamikoSshServer.connection_function stopped shell thread")

        # kill this server threads - watchdog, TapIO,
        # shell_to_channel_tapper and channel_to_shell_tapper
        run_srv.clear()
        log.debug("ParamikoSshServer.connection_function stopped server threads")

        # After execution continues, we can close the session
        session.close()
        log.debug("ParamikoSshServer.connection_function closed transport %s", session)

watchdog(is_running, run_srv, session, shell)

Method to monitor server liveness and recover where possible.

Source code in simnos/plugins/servers/ssh_server_paramiko.py
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
def watchdog(
    self,
    is_running: threading.Event,
    run_srv: threading.Event,
    session: paramiko.Transport,
    shell: Any,
):
    """
    Method to monitor server liveness and recover where possible.
    """
    while run_srv.is_set():
        if not session.is_alive():
            log.warning("ParamikoSshServer.watchdog - session not alive, stopping shell")
            shell.stop()
            break

        if not is_running.is_set():
            shell.stop()

        time.sleep(self.watchdog_interval)

Shell Plugins

Shell Plugins act as a plumbing between servers plugins and NOS plugins, gluing, connecting them together.

CMDShell

Bases: Cmd

Custom shell class to interact with NOS.

Source code in simnos/plugins/shell/cmd_shell.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
class CMDShell(Cmd):
    """
    Custom shell class to interact with NOS.
    """

    use_rawinput = False

    # pylint: disable=too-many-arguments
    def __init__(
        self,
        stdin,
        stdout,
        nos,
        nos_inventory_config,
        base_prompt,
        is_running,
        intro="Custom SSH Shell",
        ruler="",
        completekey="tab",
        newline="\r\n",
    ):
        self.nos: Nos = nos
        self.ruler = ruler
        self.intro = intro
        self.base_prompt = base_prompt
        self.newline = newline
        self.prompt = nos.initial_prompt.format(base_prompt=base_prompt)
        self.is_running = is_running

        # form commands
        self.commands = {
            **copy.deepcopy(BASIC_COMMANDS),
            **copy.deepcopy(nos.commands or {}),
            **copy.deepcopy(nos_inventory_config.get("commands", {})),
        }
        # call the base constructor of cmd.Cmd, with our own stdin and stdout
        super().__init__(
            completekey=completekey,
            stdin=stdin,
            stdout=stdout,
        )

    def start(self):
        """Method to start the shell"""
        self.cmdloop()

    def stop(self):
        """Method to stop the shell"""
        self.stdin.write("exit" + self.newline)

    def writeline(self, value):
        """Method to write a line to stdout with newline at the end"""
        for line in str(value).splitlines():
            self.stdout.write(line + self.newline)

    def emptyline(self):
        """This method to do nothing if empty line entered"""

    def reload_commands(self, changed_files: list):
        """Method to reload commands"""
        for file in changed_files:
            self.nos.from_file(file)
            self.commands.update(self.nos.commands)

    def precmd(self, line):
        """Method to return line before processing the command"""
        if os.environ.get("SIMNOS_RELOAD_COMMANDS"):
            changed_files = get_files_changed(nos.__path__[0])
            if changed_files:
                log.debug("Reloading... Files changed: %s", changed_files)
                self.reload_commands(changed_files)
        return line

    # pylint: disable=unused-argument
    def postcmd(self, stop, line):
        """Method to return stop value to stop the shell"""
        return stop

    # pylint: disable=unused-argument
    def do_help(self, arg):
        """Method to return help for commands"""
        lines = {}  # dict of {cmd: cmd_help}
        width = 0  # record longest command width for padding
        # form help for all commands
        for cmd, cmd_data in self.commands.items():
            # skip special commands
            if cmd.startswith("_") and cmd.endswith("_"):
                continue
            # skip commands that does not match current prompt
            if not self._check_prompt(cmd_data.get("prompt")):
                continue
            lines[cmd] = cmd_data.get("help", "")
            width = max(width, len(cmd))
        # form help lines
        help_msg = []
        for k, v in lines.items():
            padding = " " * (width - len(k)) + "  "
            help_msg.append(f"{k}{padding}{v}")
        self.writeline(self.newline.join(help_msg))

    def _check_prompt(self, prompt_: str | list[str]):
        """
        Helper method to check if prompt_ matches current prompt

        :param prompt_: (string or None)  prompt to check
        """
        # prompt_ is None if no 'prompt' key defined for command
        if prompt_ is None:
            return True
        if isinstance(prompt_, str):
            return self.prompt == prompt_.format(base_prompt=self.base_prompt)
        return any(self.prompt == i.format(base_prompt=self.base_prompt) for i in prompt_)

    # pylint: disable=too-many-branches
    def default(self, line):
        """Method called if no do_xyz methods found"""
        log.debug("shell.default '%s' running command '%s'", self.base_prompt, [line])
        ret = self.commands["_default_"]["output"]
        try:
            cmd_data = self.commands[line]
            if "alias" in cmd_data:
                cmd_data = {**self.commands[cmd_data["alias"]], **cmd_data}
            if self._check_prompt(cmd_data.get("prompt")):
                ret = cmd_data["output"]
                if callable(ret):
                    ret = ret(
                        self.nos.device,
                        base_prompt=self.base_prompt,
                        current_prompt=self.prompt,
                        command=line,
                    )
                    if isinstance(ret, dict):
                        if "new_prompt" in ret:
                            self.prompt = ret["new_prompt"].format(base_prompt=self.base_prompt)
                        ret = ret["output"]
                if "new_prompt" in cmd_data:
                    self.prompt = cmd_data["new_prompt"].format(base_prompt=self.base_prompt)
            else:
                log.warning(
                    "'%s' command prompt '%s' not matching current prompt '%s'",
                    line,
                    (
                        ", ".join(cmd_data.get("prompt", []))
                        if isinstance(cmd_data.get("prompt"), list)
                        else cmd_data.get("prompt", "")
                    ),
                    self.prompt,
                )
        except KeyError:
            log.error("shell.default '%s' command '%s' not found", self.base_prompt, [line])
            if callable(ret):
                ret = "An error occurred related to the command function"
        # pylint: disable=broad-except
        except ValueError:
            log.error("Output is still a callable")
            ret = "An error occurred"
        except Exception as e:
            log.error("An error occurred: %s", str(e))
            ret = traceback.format_exc()
            ret = ret.replace("\n", self.newline)
        # check if need to exit
        if ret is True or not self.is_running.is_set():
            return True
        if ret is not None:
            try:
                ret = ret.format(base_prompt=self.base_prompt)
            except KeyError:
                log.error("Error in formatting output")
            self.writeline(ret)
        return False

default(line)

Method called if no do_xyz methods found

Source code in simnos/plugins/shell/cmd_shell.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
def default(self, line):
    """Method called if no do_xyz methods found"""
    log.debug("shell.default '%s' running command '%s'", self.base_prompt, [line])
    ret = self.commands["_default_"]["output"]
    try:
        cmd_data = self.commands[line]
        if "alias" in cmd_data:
            cmd_data = {**self.commands[cmd_data["alias"]], **cmd_data}
        if self._check_prompt(cmd_data.get("prompt")):
            ret = cmd_data["output"]
            if callable(ret):
                ret = ret(
                    self.nos.device,
                    base_prompt=self.base_prompt,
                    current_prompt=self.prompt,
                    command=line,
                )
                if isinstance(ret, dict):
                    if "new_prompt" in ret:
                        self.prompt = ret["new_prompt"].format(base_prompt=self.base_prompt)
                    ret = ret["output"]
            if "new_prompt" in cmd_data:
                self.prompt = cmd_data["new_prompt"].format(base_prompt=self.base_prompt)
        else:
            log.warning(
                "'%s' command prompt '%s' not matching current prompt '%s'",
                line,
                (
                    ", ".join(cmd_data.get("prompt", []))
                    if isinstance(cmd_data.get("prompt"), list)
                    else cmd_data.get("prompt", "")
                ),
                self.prompt,
            )
    except KeyError:
        log.error("shell.default '%s' command '%s' not found", self.base_prompt, [line])
        if callable(ret):
            ret = "An error occurred related to the command function"
    # pylint: disable=broad-except
    except ValueError:
        log.error("Output is still a callable")
        ret = "An error occurred"
    except Exception as e:
        log.error("An error occurred: %s", str(e))
        ret = traceback.format_exc()
        ret = ret.replace("\n", self.newline)
    # check if need to exit
    if ret is True or not self.is_running.is_set():
        return True
    if ret is not None:
        try:
            ret = ret.format(base_prompt=self.base_prompt)
        except KeyError:
            log.error("Error in formatting output")
        self.writeline(ret)
    return False

do_help(arg)

Method to return help for commands

Source code in simnos/plugins/shell/cmd_shell.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def do_help(self, arg):
    """Method to return help for commands"""
    lines = {}  # dict of {cmd: cmd_help}
    width = 0  # record longest command width for padding
    # form help for all commands
    for cmd, cmd_data in self.commands.items():
        # skip special commands
        if cmd.startswith("_") and cmd.endswith("_"):
            continue
        # skip commands that does not match current prompt
        if not self._check_prompt(cmd_data.get("prompt")):
            continue
        lines[cmd] = cmd_data.get("help", "")
        width = max(width, len(cmd))
    # form help lines
    help_msg = []
    for k, v in lines.items():
        padding = " " * (width - len(k)) + "  "
        help_msg.append(f"{k}{padding}{v}")
    self.writeline(self.newline.join(help_msg))

emptyline()

This method to do nothing if empty line entered

Source code in simnos/plugins/shell/cmd_shell.py
82
83
def emptyline(self):
    """This method to do nothing if empty line entered"""

postcmd(stop, line)

Method to return stop value to stop the shell

Source code in simnos/plugins/shell/cmd_shell.py
101
102
103
def postcmd(self, stop, line):
    """Method to return stop value to stop the shell"""
    return stop

precmd(line)

Method to return line before processing the command

Source code in simnos/plugins/shell/cmd_shell.py
91
92
93
94
95
96
97
98
def precmd(self, line):
    """Method to return line before processing the command"""
    if os.environ.get("SIMNOS_RELOAD_COMMANDS"):
        changed_files = get_files_changed(nos.__path__[0])
        if changed_files:
            log.debug("Reloading... Files changed: %s", changed_files)
            self.reload_commands(changed_files)
    return line

reload_commands(changed_files)

Method to reload commands

Source code in simnos/plugins/shell/cmd_shell.py
85
86
87
88
89
def reload_commands(self, changed_files: list):
    """Method to reload commands"""
    for file in changed_files:
        self.nos.from_file(file)
        self.commands.update(self.nos.commands)

start()

Method to start the shell

Source code in simnos/plugins/shell/cmd_shell.py
69
70
71
def start(self):
    """Method to start the shell"""
    self.cmdloop()

stop()

Method to stop the shell

Source code in simnos/plugins/shell/cmd_shell.py
73
74
75
def stop(self):
    """Method to stop the shell"""
    self.stdin.write("exit" + self.newline)

writeline(value)

Method to write a line to stdout with newline at the end

Source code in simnos/plugins/shell/cmd_shell.py
77
78
79
80
def writeline(self, value):
    """Method to write a line to stdout with newline at the end"""
    for line in str(value).splitlines():
        self.stdout.write(line + self.newline)

Tape Plugins

Idea - Tape Plugins will allow to record interactions with real devices and build NOS plugins automatically using gathered data.