コンテンツにスキップ

Built-in Plugins API

NOS Plugins

NOS plugins are at the heart of SIMNOS, they are what enables to realize its full potential.

Cisco IOS

NOS module for Cisco IOS

CiscoIOS

Bases: BaseDevice

Class that keeps track of the state of the Cisco IOS device.

Source code in simnos/plugins/nos/platforms_py/cisco_ios.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class CiscoIOS(BaseDevice):
    """
    Class that keeps track of the state of the Cisco IOS device.
    """

    def make_show_clock(self, base_prompt, current_prompt, command):
        "Return String in format '*11:54:03.018 UTC Sat Apr 16 2022'"
        return time.strftime("*%H:%M:%S.000 %Z %a %b %d %Y")

    def make_show_running_config(self, base_prompt, current_prompt, command):
        "Return String of running configuration"
        return self.render("cisco_ios/show_running-config.j2", base_prompt=base_prompt)

    def make_show_version(self, base_prompt, current_prompt, command):
        "Return String of system hardware and software status"
        return self.render("cisco_ios/show_version.j2", base_prompt=base_prompt)

make_show_clock(base_prompt, current_prompt, command)

Return String in format '*11:54:03.018 UTC Sat Apr 16 2022'

Source code in simnos/plugins/nos/platforms_py/cisco_ios.py
25
26
27
def make_show_clock(self, base_prompt, current_prompt, command):
    "Return String in format '*11:54:03.018 UTC Sat Apr 16 2022'"
    return time.strftime("*%H:%M:%S.000 %Z %a %b %d %Y")

make_show_running_config(base_prompt, current_prompt, command)

Return String of running configuration

Source code in simnos/plugins/nos/platforms_py/cisco_ios.py
29
30
31
def make_show_running_config(self, base_prompt, current_prompt, command):
    "Return String of running configuration"
    return self.render("cisco_ios/show_running-config.j2", base_prompt=base_prompt)

make_show_version(base_prompt, current_prompt, command)

Return String of system hardware and software status

Source code in simnos/plugins/nos/platforms_py/cisco_ios.py
33
34
35
def make_show_version(self, base_prompt, current_prompt, command):
    "Return String of system hardware and software status"
    return self.render("cisco_ios/show_version.j2", base_prompt=base_prompt)

Server Plugins

Server plugins act as an access layer, simulating device connections.

ParamikoSshServer

Bases: TCPServerBase

Class to implement an SSH server using paramiko as the SSH connection library.

Source code in simnos/plugins/servers/ssh_server_paramiko.py
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
class ParamikoSshServer(TCPServerBase):
    """
    Class to implement an SSH server using paramiko
    as the SSH connection library.
    """

    _moduli_loaded: bool | None = None
    _default_key: paramiko.rsakey.RSAKey | None = None
    _default_key_lock: threading.Lock = threading.Lock()
    _KNOWN_KEY_TYPES = (
        "ssh-rsa",
        "ssh-ed25519",
        "ssh-dss",
        "ecdsa-sha2-",
        "sk-ssh-ed25519",
        "sk-ecdsa-sha2-",
    )

    def __init__(
        self,
        shell: type,
        nos: Nos,
        nos_inventory_config: dict,
        port: int,
        username: str,
        password: str,
        ssh_key_file: str | None = None,
        ssh_key_file_password: str | None = None,
        ssh_banner: str = "SIMNOS Paramiko SSH Server",
        shell_configuration: dict | None = None,
        address: str = "127.0.0.1",
        timeout: int = 1,
        watchdog_interval: float = 1,
        authorized_keys: str | None = None,
    ):
        super().__init__(address=address, port=port, timeout=timeout)

        self.nos: Nos = nos
        self.nos_inventory_config: dict = nos_inventory_config
        self.shell: type = shell
        self.shell_configuration: dict = shell_configuration or {}
        self.ssh_banner: str = ssh_banner
        self.username: str = username
        self.password: str = password
        self.watchdog_interval: float = watchdog_interval
        self._authorized_keys = self._load_authorized_keys(authorized_keys) if authorized_keys else None

        if ssh_key_file:
            self._ssh_server_key: paramiko.rsakey.RSAKey = paramiko.RSAKey.from_private_key_file(
                ssh_key_file, ssh_key_file_password
            )
        else:
            with ParamikoSshServer._default_key_lock:
                if ParamikoSshServer._default_key is None:
                    ParamikoSshServer._default_key = paramiko.RSAKey.generate(2048)
            self._ssh_server_key = ParamikoSshServer._default_key
            log.warning(
                "Using auto-generated SSH host key. This key is not persisted and "
                "will change on restart. Provide a custom key via ssh_key_file "
                "for non-local use."
            )

        # Load SSH moduli once for DH Group Exchange support in server mode.
        # Result is cached at the class level so subsequent instances skip the file I/O.
        if ParamikoSshServer._moduli_loaded is None:
            ParamikoSshServer._moduli_loaded = paramiko.Transport.load_server_moduli()

    @staticmethod
    def _load_authorized_keys(path: str) -> set[tuple[str, str]]:
        """Parse an OpenSSH authorized_keys file.

        Supports bare key lines and lines with leading options.
        Skips comment lines, blank lines, and @marker lines.
        File not found / permission errors propagate as-is (fail-fast).

        Returns a set of (key_type, base64_data) tuples.
        """
        keys: set[tuple[str, str]] = set()
        with open(path) as fh:
            for line in fh:
                line = line.strip()
                if not line or line.startswith("#"):
                    continue
                if line.startswith("@"):
                    log.warning("Skipping unsupported marker line: %s", line)
                    continue
                parts = line.split()
                for i, part in enumerate(parts):
                    if any(part.startswith(prefix) for prefix in ParamikoSshServer._KNOWN_KEY_TYPES):
                        if i + 1 < len(parts):
                            keys.add((part, parts[i + 1]))
                        else:
                            log.warning("Key type found but base64 data missing, skipping line: %s", line)
                        break
                else:
                    log.warning("No known key type found, skipping line: %s", line)
        return keys

    def watchdog(
        self,
        is_running: threading.Event,
        run_srv: threading.Event,
        session: paramiko.Transport,
        shell: Any,
    ):
        """
        Method to monitor server liveness and recover where possible.
        """
        while run_srv.is_set():
            if not session.is_alive():
                log.warning("ParamikoSshServer.watchdog - session not alive, stopping shell")
                break

            if not is_running.is_set():
                break

            time.sleep(min(self.watchdog_interval, _SHUTDOWN_TIMEOUT))

        shell.stop()

    def _read_channel_line(
        self,
        channel,
        echo: bool = True,
        skip_lf: bool = False,
    ) -> tuple[str, bool]:
        """
        Read a single line from the channel byte-by-byte.

        :param channel: paramiko Channel
        :param echo: if True, echo each byte back to the client
        :param skip_lf: if True, consume a leading LF left over from a previous CR
        :return: (line without trailing CR/LF, whether next call should skip LF)
        """
        channel.settimeout(self.timeout)
        buf = b""
        while True:
            try:
                byte = channel.recv(1)
            except TimeoutError:
                continue
            except (OSError, EOFError, paramiko.SSHException):
                break
            if byte in (b"", None):
                break
            if skip_lf:
                skip_lf = False
                if byte == b"\n":
                    continue
            if byte == b"\r":
                if echo:
                    channel.sendall(b"\r\n")
                return buf.decode("utf-8", errors="replace"), True
            if byte == b"\n":
                if echo:
                    channel.sendall(b"\r\n")
                return buf.decode("utf-8", errors="replace"), False
            if echo:
                channel.sendall(byte)
            buf += byte
        return buf.decode("utf-8", errors="replace"), False

    def _channel_login(self, channel) -> tuple[bool, bool]:
        """
        Perform channel-level login for auth_none platforms (e.g. Dell PowerConnect).

        Sends "User Name:" / "Password:" prompts and validates credentials.

        :param channel: paramiko Channel
        :return: (authenticated, skip_lf) — skip_lf should be forwarded to
                 channel_to_shell_tap so it can consume a trailing LF after
                 the final CR of the password line.
        """
        channel.sendall(b"\r\nUser Name:")
        username, skip_lf = self._read_channel_line(channel, echo=True, skip_lf=False)
        channel.sendall(b"\r\nPassword:")
        password, skip_lf = self._read_channel_line(channel, echo=False, skip_lf=skip_lf)
        channel.sendall(b"\r\n")

        if username == self.username and password == self.password:
            log.debug("Channel login succeeded for user %s", username)
            return True, skip_lf

        log.warning("Channel login failed for user %s", username)
        return False, skip_lf

    def connection_function(self, client: socket.socket, is_running: threading.Event):
        shell_replied_event = threading.Event()
        run_srv = threading.Event()
        run_srv.set()

        # determine if this NOS requires auth_none
        allow_auth_none = getattr(self.nos, "auth", None) == "none"

        # create the SSH transport object
        session = paramiko.Transport(client)
        if not self._moduli_loaded:
            session.disabled_algorithms = _DISABLED_GEX_ALGORITHMS
        session.add_server_key(self._ssh_server_key)
        session.banner_timeout = _SHUTDOWN_TIMEOUT
        session.handshake_timeout = _SHUTDOWN_TIMEOUT

        try:
            # create the server
            server = ParamikoSshServerInterface(
                ssh_banner=self.ssh_banner,
                username=self.username,
                password=self.password,
                allow_auth_none=allow_auth_none,
                authorized_keys=self._authorized_keys,
            )

            # start the SSH server — may raise SSHException if the client
            # disconnects during handshake or if stop() races with accept.
            try:
                session.start_server(server=server)
            except paramiko.SSHException as e:
                log.debug("SSH handshake failed (likely client disconnect or stop): %s", e)
                return

            # wait for the client to open a channel
            channel = None
            while channel is None and is_running.is_set() and session.is_alive():
                channel = session.accept(_SHUTDOWN_TIMEOUT)
            if channel is None:
                log.warning("session.accept() returned None or server stopping, closing transport")
                return

            # For auth_none platforms (e.g. Dell PowerConnect), perform channel-level
            # login before starting the shell.  When publickey auth is also configured,
            # clients that authenticate via publickey bypass this channel-level login
            # intentionally — SSH-level publickey auth already verified the identity.
            skip_lf = False
            if server.auth_method_used == "none":
                authenticated, skip_lf = self._channel_login(channel)
                if not authenticated:
                    log.warning("Channel login failed, closing connection")
                    return

            channel.settimeout(self.timeout)

            # create stdio for the shell
            shell_stdin, shell_stdout = TapIO(run_srv), TapIO(run_srv)

            # start intermediate thread to tap into
            # the channel->shell_stdin bytes stream
            channel_to_shell_tapper = threading.Thread(
                target=channel_to_shell_tap,
                args=(channel, shell_stdin, shell_replied_event, run_srv),
                kwargs={"initial_skip_lf": skip_lf, "shell_stdout": shell_stdout},
                daemon=True,
            )
            channel_to_shell_tapper.start()

            # start intermediate thread to tap into
            # the shell_stdout->channel bytes stream
            shell_to_channel_tapper = threading.Thread(
                target=shell_to_channel_tap,
                args=(channel, shell_stdout, shell_replied_event, run_srv),
                daemon=True,
            )
            shell_to_channel_tapper.start()

            # create the client shell
            client_shell = self.shell(
                stdin=shell_stdin,
                stdout=shell_stdout,
                nos=self.nos,
                nos_inventory_config=self.nos_inventory_config,
                is_running=is_running,
                **self.shell_configuration,
            )

            # start watchdog thread
            watchdog_thread = threading.Thread(
                target=self.watchdog, args=(is_running, run_srv, session, client_shell), daemon=True
            )
            watchdog_thread.start()

            # running this command will block this function until shell exits
            client_shell.start()
            log.debug("ParamikoSshServer.connection_function stopped shell thread")

        finally:
            # Stop all server threads
            run_srv.clear()
            log.debug("ParamikoSshServer.connection_function stopped server threads")

            session.close()
            log.debug("ParamikoSshServer.connection_function closed transport %s", session)

watchdog(is_running, run_srv, session, shell)

Method to monitor server liveness and recover where possible.

Source code in simnos/plugins/servers/ssh_server_paramiko.py
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
def watchdog(
    self,
    is_running: threading.Event,
    run_srv: threading.Event,
    session: paramiko.Transport,
    shell: Any,
):
    """
    Method to monitor server liveness and recover where possible.
    """
    while run_srv.is_set():
        if not session.is_alive():
            log.warning("ParamikoSshServer.watchdog - session not alive, stopping shell")
            break

        if not is_running.is_set():
            break

        time.sleep(min(self.watchdog_interval, _SHUTDOWN_TIMEOUT))

    shell.stop()

TelnetServer

Bases: TCPServerBase

Telnet server plugin using raw sockets.

Follows the same plugin architecture as ParamikoSshServer: TCPServerBase → connection_function() → TapIO → CMDShell.

Source code in simnos/plugins/servers/telnet_server.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
class TelnetServer(TCPServerBase):
    """
    Telnet server plugin using raw sockets.

    Follows the same plugin architecture as ParamikoSshServer:
    TCPServerBase → connection_function() → TapIO → CMDShell.
    """

    def __init__(
        self,
        shell: type,
        nos: Nos,
        nos_inventory_config: dict,
        port: int,
        username: str,
        password: str,
        banner: str = "SIMNOS Telnet Server",
        shell_configuration: dict | None = None,
        address: str = "127.0.0.1",
        timeout: int = 1,
        watchdog_interval: float = 1,
    ):
        super().__init__(address=address, port=port, timeout=timeout)

        self.nos: Nos = nos
        self.nos_inventory_config: dict = nos_inventory_config
        self.shell: type = shell
        self.shell_configuration: dict = shell_configuration or {}
        self.banner: str = banner
        self.username: str = username
        self.password: str = password
        self.watchdog_interval: float = watchdog_interval

        if not _is_loopback(address):
            log.warning(
                "Telnet transmits all data (including credentials) in plaintext. "
                "Binding to non-local address %s is insecure. "
                "Use SSH (ParamikoSshServer) for non-local access.",
                address,
            )

    # ------------------------------------------------------------------
    # IAC handling
    # ------------------------------------------------------------------

    def _recv_byte(self, sock: socket.socket) -> bytes | None:
        """Read one data byte, transparently handling IAC sequences."""
        while True:
            byte = sock.recv(1)
            if not byte:
                return None
            if byte[0] != IAC:
                return byte
            # IAC handling
            cmd = sock.recv(1)
            if not cmd:
                return None
            if cmd[0] == IAC:  # IAC IAC → literal 0xFF
                return b"\xff"
            if cmd[0] in (WILL, WONT, DO, DONT):  # 3-byte negotiation
                opt = sock.recv(1)
                if opt:
                    self._handle_negotiation(sock, cmd[0], opt[0])
                continue
            if cmd[0] == SB:  # Subnegotiation → skip until IAC SE
                self._skip_subnegotiation(sock)
                continue
            continue  # Other IAC commands (NOP, GA) → skip

    def _handle_negotiation(self, sock: socket.socket, cmd: int, opt: int) -> None:
        """Respond to a Telnet negotiation command."""
        if cmd == DO:
            if opt not in (SGA, ECHO):
                sock.sendall(bytes([IAC, WONT, opt]))  # Refuse unsupported
        elif cmd == WILL:
            if opt == NAWS:
                sock.sendall(bytes([IAC, DO, opt]))  # Accept NAWS
            else:
                sock.sendall(bytes([IAC, DONT, opt]))  # Refuse others
        # DONT, WONT → no response needed (already off)

    def _skip_subnegotiation(self, sock: socket.socket) -> None:
        """Skip subnegotiation data until IAC SE, handling IAC IAC escapes."""
        while True:
            byte = sock.recv(1)
            if not byte:
                return  # EOF → silently return (disconnect detected upstream)
            if byte[0] == IAC:
                next_byte = sock.recv(1)
                if not next_byte:
                    return  # EOF
                if next_byte[0] == SE:
                    return  # Normal end of subnegotiation
                # IAC IAC → escaped 0xFF in SB data, ignore and continue
                # IAC + other → protocol violation, tolerate and continue
                continue

    # ------------------------------------------------------------------
    # Line reading and authentication
    # ------------------------------------------------------------------

    def _read_line(self, sock: socket.socket, echo: bool = True) -> str:
        """
        Read a single line from the socket byte-by-byte using _recv_byte.

        Handles CR LF, CR NUL (RFC 854), and bare LF as line terminators.

        :param sock: client socket
        :param echo: if True, echo each byte back to the client
        :return: the line read (without trailing CR/LF)
        """
        buf = b""
        while True:
            try:
                byte = self._recv_byte(sock)
            except TimeoutError:
                continue
            if byte is None:
                break
            if byte == b"\r":
                if echo:
                    sock.sendall(b"\r\n")
                # Consume trailing LF or NUL after CR (RFC 854).
                # Non-standard followers are discarded for simplicity;
                # in practice, clients always send CR LF or CR NUL.
                with contextlib.suppress(TimeoutError):
                    self._recv_byte(sock)
                break
            if byte == b"\n":
                if echo:
                    sock.sendall(b"\r\n")
                break
            if echo:
                sock.sendall(byte)
            buf += byte
        return buf.decode("utf-8", errors="replace")

    def _authenticate(self, sock: socket.socket) -> bool:
        """
        Perform username/password authentication over the Telnet connection.

        :param sock: client socket
        :return: True if authentication succeeded
        """
        sock.sendall(b"Username: ")
        username = self._read_line(sock, echo=True)
        sock.sendall(b"Password: ")
        password = self._read_line(sock, echo=False)
        sock.sendall(b"\r\n")
        return username == self.username and password == self.password

    # ------------------------------------------------------------------
    # Tap functions (socket ↔ shell bridge)
    # ------------------------------------------------------------------

    def socket_to_shell_tap(
        self,
        sock: socket.socket,
        shell_stdin: TapIO,
        shell_replied_event: threading.Event,
        run_srv: threading.Event,
        *,
        shell_stdout: TapIO | None = None,
    ) -> None:
        """Read bytes from socket and forward complete lines to shell stdin.

        When *shell_stdout* is provided, the newline echo (``\\r\\n``) for
        line-ending bytes is written to *shell_stdout* instead of being sent
        directly to the socket.  ``shell_to_socket_tap`` then batches the
        echo together with the shell's response into a single ``sendall()``,
        preventing a race where the client reads the echo and the prompt as
        separate TCP segments (#94).
        """
        buffer: io.BytesIO = io.BytesIO()
        skip_lf = False  # Set after \r to consume the trailing \n of CR LF
        while run_srv.is_set():
            try:
                byte = self._recv_byte(sock)
            except TimeoutError:
                continue
            except OSError:
                log.error("telnet_server.socket_to_shell_tap socket read error")
                break

            # EOF / connection closed
            if byte is None:
                break

            # Drop NUL bytes completely.
            # CR NUL is a complete sequence (RFC 854), so reset skip_lf.
            if byte == b"\x00":
                skip_lf = False
                continue

            # Consume the LF half of a CR LF pair (RFC 854).
            if skip_lf:
                skip_lf = False
                if byte == b"\n":
                    continue

            # Wait for the shell to reply, but check run_srv periodically
            # so that shutdown is not blocked for the full wait duration.
            while not shell_replied_event.wait(timeout=_SHUTDOWN_TIMEOUT):
                if not run_srv.is_set():
                    break
            if not run_srv.is_set():
                break

            try:
                if byte in (b"\r", b"\n"):
                    skip_lf = byte == b"\r"
                    if shell_stdout is not None:
                        shell_stdout.write("\r\n")
                    else:
                        sock.sendall(b"\r\n")
                    log.debug("telnet_server.socket_to_shell_tap echoing newline")
                    buffer.write(byte)
                    buffer.seek(0)
                    line = buffer.read().decode(encoding="utf-8")
                    buffer.seek(0)
                    buffer.truncate()
                    log.debug(
                        "telnet_server.socket_to_shell_tap sending line to shell: %s",
                        [line],
                    )
                    shell_stdin.write(line)
                    shell_replied_event.clear()
                else:
                    sock.sendall(byte)
                    log.debug(
                        "telnet_server.socket_to_shell_tap echoing byte to socket: %s",
                        [byte],
                    )
                    buffer.write(byte)
            except OSError as e:
                log.error("telnet_server.socket_to_shell_tap socket write error: %s", e)
                break

        # Signal all threads to stop
        run_srv.clear()

    def shell_to_socket_tap(
        self,
        sock: socket.socket,
        shell_stdout: TapIO,
        shell_replied_event: threading.Event,
        run_srv: threading.Event,
    ) -> None:
        """Read lines from shell stdout and send them to the socket.

        After reading the first available line, a brief coalescing pause
        (1 ms) allows the shell to enqueue additional output (e.g. a prompt
        following a newline echo).  All available lines are then sent in a
        single ``sendall()`` call so the client receives them in one TCP
        segment, avoiding a race where echo and prompt arrive separately
        (#94, mirrors SSH fix in #87).

        When the first line is whitespace-only (a newline echo from
        ``socket_to_shell_tap``), the function blocks on a second
        ``readline()`` instead of sending the echo alone.  This guarantees
        the echo and the prompt that follows it are always delivered in the
        same ``sendall()`` call.
        """
        while run_srv.is_set():
            line = shell_stdout.readline()
            if not line:
                break

            # Coalesce: brief pause lets the shell enqueue the prompt
            # after the newline echo, so both are sent in one segment.
            time.sleep(0.001)

            batch = process_tap_line(line)
            drained = shell_stdout.drain()

            if not drained and batch.strip() == "":
                # Echo-only batch (e.g. "\r\n") with nothing else queued.
                # The shell hasn't produced the prompt yet — block until it
                # does so we never send a bare echo as a separate segment.
                next_line = shell_stdout.readline()
                if next_line:
                    batch += process_tap_line(next_line)
                    # Drain any extra items that arrived alongside the prompt.
                    time.sleep(0.001)
                    drained = shell_stdout.drain()

            for extra in drained:
                # Separate consecutive lines that don't end with a newline
                # (e.g. prompts) so they aren't concatenated into one string
                # which breaks netmiko's find_prompt().
                if batch and not batch.endswith("\n"):
                    batch += "\r\n"
                batch += process_tap_line(extra)

            log.debug("telnet_server.shell_to_socket_tap sending batch to socket: %s", [batch])
            if not run_srv.is_set():
                break
            try:
                sock.sendall(batch.encode(encoding="utf-8"))
            except OSError as e:
                log.error("telnet_server.shell_to_socket_tap socket write error: %s", e)
                break
            shell_replied_event.set()

        # Signal all threads to stop
        run_srv.clear()

    # ------------------------------------------------------------------
    # Watchdog
    # ------------------------------------------------------------------

    def watchdog(
        self,
        is_running: threading.Event,
        run_srv: threading.Event,
        shell: Any,
    ) -> None:
        """Monitor server liveness and ensure shell stops on disconnect.

        The loop exits when either ``run_srv`` is cleared (client disconnect
        detected by a tap function) or ``is_running`` is cleared (server-wide
        shutdown).  In both cases ``shell.stop()`` must be called so that
        ``CMDShell.cmdloop()`` unblocks and ``connection_function`` can return.
        """
        while run_srv.is_set():
            if not is_running.is_set():
                break
            time.sleep(min(self.watchdog_interval, _SHUTDOWN_TIMEOUT))
        # Always stop the shell — whether run_srv or is_running caused the exit.
        shell.stop()

    # ------------------------------------------------------------------
    # Connection handler
    # ------------------------------------------------------------------

    def connection_function(self, client: socket.socket, is_running: threading.Event) -> None:
        shell_replied_event = threading.Event()
        run_srv = threading.Event()
        run_srv.set()

        try:
            client.settimeout(self.timeout)

            # Initiate Telnet negotiation: character-at-a-time mode
            client.sendall(bytes([IAC, WILL, SGA]))
            client.sendall(bytes([IAC, WILL, ECHO]))

            # Give the client a moment to send initial IAC responses,
            # then drain them using _recv_byte so that negotiation commands
            # (e.g. DO SGA, DO ECHO, WILL NAWS) are properly answered via
            # _handle_negotiation instead of being silently discarded.
            # A short blocking timeout is used instead of non-blocking mode
            # so that multi-byte IAC sequences split across TCP segments
            # are received completely rather than raising mid-sequence.
            time.sleep(0.1)
            client.settimeout(_IAC_DRAIN_TIMEOUT)
            try:
                while True:
                    if self._recv_byte(client) is None:
                        break  # EOF — client disconnected
            except TimeoutError:
                pass  # No more data available — expected
            finally:
                client.settimeout(self.timeout)

            # Send banner
            if self.banner:
                client.sendall((self.banner + "\r\n").encode("utf-8"))

            # Authenticate
            try:
                auth_ok = self._authenticate(client)
            except (TimeoutError, OSError):
                log.debug("Client disconnected during authentication")
                return
            if not auth_ok:
                log.warning("Telnet authentication failed, closing connection")
                with contextlib.suppress(OSError):
                    client.sendall(b"Authentication failed.\r\n")
                return

            # Create stdio for the shell
            shell_stdin, shell_stdout = TapIO(run_srv), TapIO(run_srv)

            # Start socket→shell tap thread
            socket_to_shell_tapper = threading.Thread(
                target=self.socket_to_shell_tap,
                args=(client, shell_stdin, shell_replied_event, run_srv),
                kwargs={"shell_stdout": shell_stdout},
                daemon=True,
            )
            socket_to_shell_tapper.start()

            # Start shell→socket tap thread
            shell_to_socket_tapper = threading.Thread(
                target=self.shell_to_socket_tap,
                args=(client, shell_stdout, shell_replied_event, run_srv),
                daemon=True,
            )
            shell_to_socket_tapper.start()

            # Create the client shell
            client_shell = self.shell(
                stdin=shell_stdin,
                stdout=shell_stdout,
                nos=self.nos,
                nos_inventory_config=self.nos_inventory_config,
                is_running=is_running,
                **self.shell_configuration,
            )

            # Start watchdog thread
            watchdog_thread = threading.Thread(
                target=self.watchdog,
                args=(is_running, run_srv, client_shell),
                daemon=True,
            )
            watchdog_thread.start()

            # Block until shell exits
            client_shell.start()
            log.debug("TelnetServer.connection_function stopped shell thread")

        finally:
            # Stop all server threads
            run_srv.clear()
            log.debug("TelnetServer.connection_function stopped server threads")

            with contextlib.suppress(OSError):
                client.close()
            log.debug("TelnetServer.connection_function closed socket")

shell_to_socket_tap(sock, shell_stdout, shell_replied_event, run_srv)

Read lines from shell stdout and send them to the socket.

After reading the first available line, a brief coalescing pause (1 ms) allows the shell to enqueue additional output (e.g. a prompt following a newline echo). All available lines are then sent in a single sendall() call so the client receives them in one TCP segment, avoiding a race where echo and prompt arrive separately (#94, mirrors SSH fix in #87).

When the first line is whitespace-only (a newline echo from socket_to_shell_tap), the function blocks on a second readline() instead of sending the echo alone. This guarantees the echo and the prompt that follows it are always delivered in the same sendall() call.

Source code in simnos/plugins/servers/telnet_server.py
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
def shell_to_socket_tap(
    self,
    sock: socket.socket,
    shell_stdout: TapIO,
    shell_replied_event: threading.Event,
    run_srv: threading.Event,
) -> None:
    """Read lines from shell stdout and send them to the socket.

    After reading the first available line, a brief coalescing pause
    (1 ms) allows the shell to enqueue additional output (e.g. a prompt
    following a newline echo).  All available lines are then sent in a
    single ``sendall()`` call so the client receives them in one TCP
    segment, avoiding a race where echo and prompt arrive separately
    (#94, mirrors SSH fix in #87).

    When the first line is whitespace-only (a newline echo from
    ``socket_to_shell_tap``), the function blocks on a second
    ``readline()`` instead of sending the echo alone.  This guarantees
    the echo and the prompt that follows it are always delivered in the
    same ``sendall()`` call.
    """
    while run_srv.is_set():
        line = shell_stdout.readline()
        if not line:
            break

        # Coalesce: brief pause lets the shell enqueue the prompt
        # after the newline echo, so both are sent in one segment.
        time.sleep(0.001)

        batch = process_tap_line(line)
        drained = shell_stdout.drain()

        if not drained and batch.strip() == "":
            # Echo-only batch (e.g. "\r\n") with nothing else queued.
            # The shell hasn't produced the prompt yet — block until it
            # does so we never send a bare echo as a separate segment.
            next_line = shell_stdout.readline()
            if next_line:
                batch += process_tap_line(next_line)
                # Drain any extra items that arrived alongside the prompt.
                time.sleep(0.001)
                drained = shell_stdout.drain()

        for extra in drained:
            # Separate consecutive lines that don't end with a newline
            # (e.g. prompts) so they aren't concatenated into one string
            # which breaks netmiko's find_prompt().
            if batch and not batch.endswith("\n"):
                batch += "\r\n"
            batch += process_tap_line(extra)

        log.debug("telnet_server.shell_to_socket_tap sending batch to socket: %s", [batch])
        if not run_srv.is_set():
            break
        try:
            sock.sendall(batch.encode(encoding="utf-8"))
        except OSError as e:
            log.error("telnet_server.shell_to_socket_tap socket write error: %s", e)
            break
        shell_replied_event.set()

    # Signal all threads to stop
    run_srv.clear()

socket_to_shell_tap(sock, shell_stdin, shell_replied_event, run_srv, *, shell_stdout=None)

Read bytes from socket and forward complete lines to shell stdin.

When shell_stdout is provided, the newline echo (\r\n) for line-ending bytes is written to shell_stdout instead of being sent directly to the socket. shell_to_socket_tap then batches the echo together with the shell's response into a single sendall(), preventing a race where the client reads the echo and the prompt as separate TCP segments (#94).

Source code in simnos/plugins/servers/telnet_server.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
def socket_to_shell_tap(
    self,
    sock: socket.socket,
    shell_stdin: TapIO,
    shell_replied_event: threading.Event,
    run_srv: threading.Event,
    *,
    shell_stdout: TapIO | None = None,
) -> None:
    """Read bytes from socket and forward complete lines to shell stdin.

    When *shell_stdout* is provided, the newline echo (``\\r\\n``) for
    line-ending bytes is written to *shell_stdout* instead of being sent
    directly to the socket.  ``shell_to_socket_tap`` then batches the
    echo together with the shell's response into a single ``sendall()``,
    preventing a race where the client reads the echo and the prompt as
    separate TCP segments (#94).
    """
    buffer: io.BytesIO = io.BytesIO()
    skip_lf = False  # Set after \r to consume the trailing \n of CR LF
    while run_srv.is_set():
        try:
            byte = self._recv_byte(sock)
        except TimeoutError:
            continue
        except OSError:
            log.error("telnet_server.socket_to_shell_tap socket read error")
            break

        # EOF / connection closed
        if byte is None:
            break

        # Drop NUL bytes completely.
        # CR NUL is a complete sequence (RFC 854), so reset skip_lf.
        if byte == b"\x00":
            skip_lf = False
            continue

        # Consume the LF half of a CR LF pair (RFC 854).
        if skip_lf:
            skip_lf = False
            if byte == b"\n":
                continue

        # Wait for the shell to reply, but check run_srv periodically
        # so that shutdown is not blocked for the full wait duration.
        while not shell_replied_event.wait(timeout=_SHUTDOWN_TIMEOUT):
            if not run_srv.is_set():
                break
        if not run_srv.is_set():
            break

        try:
            if byte in (b"\r", b"\n"):
                skip_lf = byte == b"\r"
                if shell_stdout is not None:
                    shell_stdout.write("\r\n")
                else:
                    sock.sendall(b"\r\n")
                log.debug("telnet_server.socket_to_shell_tap echoing newline")
                buffer.write(byte)
                buffer.seek(0)
                line = buffer.read().decode(encoding="utf-8")
                buffer.seek(0)
                buffer.truncate()
                log.debug(
                    "telnet_server.socket_to_shell_tap sending line to shell: %s",
                    [line],
                )
                shell_stdin.write(line)
                shell_replied_event.clear()
            else:
                sock.sendall(byte)
                log.debug(
                    "telnet_server.socket_to_shell_tap echoing byte to socket: %s",
                    [byte],
                )
                buffer.write(byte)
        except OSError as e:
            log.error("telnet_server.socket_to_shell_tap socket write error: %s", e)
            break

    # Signal all threads to stop
    run_srv.clear()

watchdog(is_running, run_srv, shell)

Monitor server liveness and ensure shell stops on disconnect.

The loop exits when either run_srv is cleared (client disconnect detected by a tap function) or is_running is cleared (server-wide shutdown). In both cases shell.stop() must be called so that CMDShell.cmdloop() unblocks and connection_function can return.

Source code in simnos/plugins/servers/telnet_server.py
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
def watchdog(
    self,
    is_running: threading.Event,
    run_srv: threading.Event,
    shell: Any,
) -> None:
    """Monitor server liveness and ensure shell stops on disconnect.

    The loop exits when either ``run_srv`` is cleared (client disconnect
    detected by a tap function) or ``is_running`` is cleared (server-wide
    shutdown).  In both cases ``shell.stop()`` must be called so that
    ``CMDShell.cmdloop()`` unblocks and ``connection_function`` can return.
    """
    while run_srv.is_set():
        if not is_running.is_set():
            break
        time.sleep(min(self.watchdog_interval, _SHUTDOWN_TIMEOUT))
    # Always stop the shell — whether run_srv or is_running caused the exit.
    shell.stop()

Internal

TapIO

Thread-safe I/O bridge shared by both SSH and Telnet server plugins. This is an internal helper, not a public API.

Bases: StringIO

Class to implement StringIO subclass but with blocking readline method and a deque to buffer lines on write.

Uses collections.deque for thread-safe, O(1) append/pop operations (CPython's GIL guarantees atomicity for deque append/pop).

A threading.Condition is used to wake readline() immediately when write() adds data, eliminating the polling delay that caused intermittent empty output in netmiko send_command() (#87).

Source code in simnos/plugins/servers/tap_io.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class TapIO(io.StringIO):
    """
    Class to implement StringIO subclass but with blocking readline method
    and a deque to buffer lines on write.

    Uses ``collections.deque`` for thread-safe, O(1) append/pop operations
    (CPython's GIL guarantees atomicity for deque ``append``/``pop``).

    A ``threading.Condition`` is used to wake ``readline()`` immediately
    when ``write()`` adds data, eliminating the polling delay that caused
    intermittent empty output in netmiko ``send_command()`` (#87).
    """

    def __init__(self, run_srv: threading.Event, initial_value: str = "", newline: str = "\n"):
        self.lines: deque[str] = deque()
        self.run_srv: threading.Event = run_srv
        self._cond: threading.Condition = threading.Condition()
        super().__init__(initial_value, newline)

    def readline(self):
        """Block until a line is available or the server shuts down."""
        with self._cond:
            while self.run_srv.is_set():
                if self.lines:
                    return self.lines.pop()
                self._cond.wait(timeout=0.1)
        if self.lines:
            return self.lines.pop()
        return ""

    def drain(self) -> list[str]:
        """Pop all buffered lines without blocking.

        Returns a list in FIFO order (oldest first).
        """
        items: list[str] = []
        while self.lines:
            items.append(self.lines.pop())
        return items

    def write(self, value: str):
        """Append *value* to the buffer and wake any blocked ``readline()``."""
        self.lines.appendleft(value)
        with self._cond:
            self._cond.notify()

drain()

Pop all buffered lines without blocking.

Returns a list in FIFO order (oldest first).

Source code in simnos/plugins/servers/tap_io.py
43
44
45
46
47
48
49
50
51
def drain(self) -> list[str]:
    """Pop all buffered lines without blocking.

    Returns a list in FIFO order (oldest first).
    """
    items: list[str] = []
    while self.lines:
        items.append(self.lines.pop())
    return items

readline()

Block until a line is available or the server shuts down.

Source code in simnos/plugins/servers/tap_io.py
32
33
34
35
36
37
38
39
40
41
def readline(self):
    """Block until a line is available or the server shuts down."""
    with self._cond:
        while self.run_srv.is_set():
            if self.lines:
                return self.lines.pop()
            self._cond.wait(timeout=0.1)
    if self.lines:
        return self.lines.pop()
    return ""

write(value)

Append value to the buffer and wake any blocked readline().

Source code in simnos/plugins/servers/tap_io.py
53
54
55
56
57
def write(self, value: str):
    """Append *value* to the buffer and wake any blocked ``readline()``."""
    self.lines.appendleft(value)
    with self._cond:
        self._cond.notify()

Shell Plugins

Shell plugins act as plumbing between server plugins and NOS plugins, connecting them together.

CMDShell

Bases: Cmd

Custom shell class to interact with NOS.

Source code in simnos/plugins/shell/cmd_shell.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
class CMDShell(Cmd):
    """
    Custom shell class to interact with NOS.
    """

    use_rawinput = False

    # pylint: disable=too-many-arguments
    def __init__(
        self,
        stdin,
        stdout,
        nos,
        nos_inventory_config,
        base_prompt,
        is_running,
        intro="Custom SSH Shell",
        ruler="",
        completekey="tab",
        newline="\r\n",
    ):
        self.nos: Nos = nos
        self.ruler = ruler
        self.intro = intro
        self.base_prompt = base_prompt
        self.newline = newline
        self.prompt = nos.initial_prompt.format(base_prompt=base_prompt)
        self.is_running = is_running

        # form commands
        self.commands = {
            **copy.deepcopy(BASIC_COMMANDS),
            **copy.deepcopy(nos.commands or {}),
            **copy.deepcopy(nos_inventory_config.get("commands", {})),
        }
        # call the base constructor of cmd.Cmd, with our own stdin and stdout
        super().__init__(
            completekey=completekey,
            stdin=stdin,
            stdout=stdout,
        )

    def start(self):
        """Method to start the shell"""
        self.cmdloop()

    def stop(self):
        """Method to stop the shell"""
        self.stdin.write("exit" + self.newline)

    def writeline(self, value):
        """Method to write a line to stdout with newline at the end"""
        for line in str(value).splitlines():
            self.stdout.write(line + self.newline)

    def do_EOF(self, line):
        """Handle EOF from readline — exit the shell gracefully."""
        return True

    def emptyline(self):
        """This method to do nothing if empty line entered"""

    def reload_commands(self, changed_files: list):
        """Method to reload commands"""
        for file in changed_files:
            self.nos.from_file(file)
            self.commands.update(self.nos.commands)

    def precmd(self, line):
        """Method to return line before processing the command"""
        if os.environ.get("SIMNOS_RELOAD_COMMANDS"):
            changed_files = get_files_changed(nos.__path__[0])
            if changed_files:
                log.debug("Reloading... Files changed: %s", changed_files)
                self.reload_commands(changed_files)
        return line

    # pylint: disable=unused-argument
    def postcmd(self, stop, line):
        """Method to return stop value to stop the shell"""
        return stop

    # pylint: disable=unused-argument
    def do_help(self, arg):
        """Method to return help for commands"""
        lines = {}  # dict of {cmd: cmd_help}
        width = 0  # record longest command width for padding
        # form help for all commands
        for cmd, cmd_data in self.commands.items():
            # skip special commands
            if cmd.startswith("_") and cmd.endswith("_"):
                continue
            # skip commands that does not match current prompt
            if not self._check_prompt(cmd_data.get("prompt")):
                continue
            lines[cmd] = cmd_data.get("help", "")
            width = max(width, len(cmd))
        # form help lines
        help_msg = []
        for k, v in lines.items():
            padding = " " * (width - len(k)) + "  "
            help_msg.append(f"{k}{padding}{v}")
        self.writeline(self.newline.join(help_msg))

    def _check_prompt(self, prompt_: str | list[str]):
        """
        Helper method to check if prompt_ matches current prompt

        :param prompt_: (string or None)  prompt to check
        """
        # prompt_ is None if no 'prompt' key defined for command
        if prompt_ is None:
            return True
        if isinstance(prompt_, str):
            return self.prompt == prompt_.format(base_prompt=self.base_prompt)
        return any(self.prompt == i.format(base_prompt=self.base_prompt) for i in prompt_)

    # pylint: disable=too-many-branches
    def default(self, line):
        """Method called if no do_xyz methods found"""
        log.debug("shell.default '%s' running command '%s'", self.base_prompt, [line])
        ret = self.commands["_default_"]["output"]
        try:
            cmd_data = self.commands[line]
            if "alias" in cmd_data:
                cmd_data = {**self.commands[cmd_data["alias"]], **cmd_data}
            if self._check_prompt(cmd_data.get("prompt")):
                if cmd_data.get("exit"):
                    return True
                ret = cmd_data.get("output")
                if callable(ret):
                    ret = ret(
                        self.nos.device,
                        base_prompt=self.base_prompt,
                        current_prompt=self.prompt,
                        command=line,
                    )
                    if isinstance(ret, dict):
                        if "new_prompt" in ret:
                            self.prompt = ret["new_prompt"].format(base_prompt=self.base_prompt)
                        if ret.get("exit"):
                            return True
                        ret = ret.get("output")
                if "new_prompt" in cmd_data:
                    self.prompt = cmd_data["new_prompt"].format(base_prompt=self.base_prompt)
            else:
                log.warning(
                    "'%s' command prompt '%s' not matching current prompt '%s'",
                    line,
                    (
                        ", ".join(cmd_data.get("prompt", []))
                        if isinstance(cmd_data.get("prompt"), list)
                        else cmd_data.get("prompt", "")
                    ),
                    self.prompt,
                )
        except KeyError:
            log.error("shell.default '%s' command '%s' not found", self.base_prompt, [line])
            if callable(ret):
                ret = "An error occurred related to the command function"
        # pylint: disable=broad-except
        except ValueError:
            log.error("Output is still a callable")
            ret = "An error occurred"
        except Exception as e:
            log.error("An error occurred: %s", str(e))
            ret = traceback.format_exc()
            ret = ret.replace("\n", self.newline)
        if not self.is_running.is_set():
            return True
        if ret is not None:
            try:
                ret = ret.format(base_prompt=self.base_prompt)
            except KeyError:
                log.error("Error in formatting output")
            self.writeline(ret)
        return False

default(line)

Method called if no do_xyz methods found

Source code in simnos/plugins/shell/cmd_shell.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
def default(self, line):
    """Method called if no do_xyz methods found"""
    log.debug("shell.default '%s' running command '%s'", self.base_prompt, [line])
    ret = self.commands["_default_"]["output"]
    try:
        cmd_data = self.commands[line]
        if "alias" in cmd_data:
            cmd_data = {**self.commands[cmd_data["alias"]], **cmd_data}
        if self._check_prompt(cmd_data.get("prompt")):
            if cmd_data.get("exit"):
                return True
            ret = cmd_data.get("output")
            if callable(ret):
                ret = ret(
                    self.nos.device,
                    base_prompt=self.base_prompt,
                    current_prompt=self.prompt,
                    command=line,
                )
                if isinstance(ret, dict):
                    if "new_prompt" in ret:
                        self.prompt = ret["new_prompt"].format(base_prompt=self.base_prompt)
                    if ret.get("exit"):
                        return True
                    ret = ret.get("output")
            if "new_prompt" in cmd_data:
                self.prompt = cmd_data["new_prompt"].format(base_prompt=self.base_prompt)
        else:
            log.warning(
                "'%s' command prompt '%s' not matching current prompt '%s'",
                line,
                (
                    ", ".join(cmd_data.get("prompt", []))
                    if isinstance(cmd_data.get("prompt"), list)
                    else cmd_data.get("prompt", "")
                ),
                self.prompt,
            )
    except KeyError:
        log.error("shell.default '%s' command '%s' not found", self.base_prompt, [line])
        if callable(ret):
            ret = "An error occurred related to the command function"
    # pylint: disable=broad-except
    except ValueError:
        log.error("Output is still a callable")
        ret = "An error occurred"
    except Exception as e:
        log.error("An error occurred: %s", str(e))
        ret = traceback.format_exc()
        ret = ret.replace("\n", self.newline)
    if not self.is_running.is_set():
        return True
    if ret is not None:
        try:
            ret = ret.format(base_prompt=self.base_prompt)
        except KeyError:
            log.error("Error in formatting output")
        self.writeline(ret)
    return False

do_EOF(line)

Handle EOF from readline — exit the shell gracefully.

Source code in simnos/plugins/shell/cmd_shell.py
82
83
84
def do_EOF(self, line):
    """Handle EOF from readline — exit the shell gracefully."""
    return True

do_help(arg)

Method to return help for commands

Source code in simnos/plugins/shell/cmd_shell.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def do_help(self, arg):
    """Method to return help for commands"""
    lines = {}  # dict of {cmd: cmd_help}
    width = 0  # record longest command width for padding
    # form help for all commands
    for cmd, cmd_data in self.commands.items():
        # skip special commands
        if cmd.startswith("_") and cmd.endswith("_"):
            continue
        # skip commands that does not match current prompt
        if not self._check_prompt(cmd_data.get("prompt")):
            continue
        lines[cmd] = cmd_data.get("help", "")
        width = max(width, len(cmd))
    # form help lines
    help_msg = []
    for k, v in lines.items():
        padding = " " * (width - len(k)) + "  "
        help_msg.append(f"{k}{padding}{v}")
    self.writeline(self.newline.join(help_msg))

emptyline()

This method to do nothing if empty line entered

Source code in simnos/plugins/shell/cmd_shell.py
86
87
def emptyline(self):
    """This method to do nothing if empty line entered"""

postcmd(stop, line)

Method to return stop value to stop the shell

Source code in simnos/plugins/shell/cmd_shell.py
105
106
107
def postcmd(self, stop, line):
    """Method to return stop value to stop the shell"""
    return stop

precmd(line)

Method to return line before processing the command

Source code in simnos/plugins/shell/cmd_shell.py
 95
 96
 97
 98
 99
100
101
102
def precmd(self, line):
    """Method to return line before processing the command"""
    if os.environ.get("SIMNOS_RELOAD_COMMANDS"):
        changed_files = get_files_changed(nos.__path__[0])
        if changed_files:
            log.debug("Reloading... Files changed: %s", changed_files)
            self.reload_commands(changed_files)
    return line

reload_commands(changed_files)

Method to reload commands

Source code in simnos/plugins/shell/cmd_shell.py
89
90
91
92
93
def reload_commands(self, changed_files: list):
    """Method to reload commands"""
    for file in changed_files:
        self.nos.from_file(file)
        self.commands.update(self.nos.commands)

start()

Method to start the shell

Source code in simnos/plugins/shell/cmd_shell.py
69
70
71
def start(self):
    """Method to start the shell"""
    self.cmdloop()

stop()

Method to stop the shell

Source code in simnos/plugins/shell/cmd_shell.py
73
74
75
def stop(self):
    """Method to stop the shell"""
    self.stdin.write("exit" + self.newline)

writeline(value)

Method to write a line to stdout with newline at the end

Source code in simnos/plugins/shell/cmd_shell.py
77
78
79
80
def writeline(self, value):
    """Method to write a line to stdout with newline at the end"""
    for line in str(value).splitlines():
        self.stdout.write(line + self.newline)

Tape Plugins

Idea - Tape Plugins will allow to record interactions with real devices and build NOS plugins automatically using gathered data.