Advanced Command Generator

Walk Agent, limit number of packets

Send a series of SNMP GETBULK requests using the following options:

  • with SNMPv3, user ‘usr-none-none’, no authentication, no privacy
  • over IPv4/UDP
  • to an Agent at demo.snmplabs.com:161
  • for all OIDs past SNMPv2-MIB::system
  • run till end-of-mib condition is reported by Agent OR maxCalls == 10 request-response interactions occur

Functionally similar to:

$ snmpbulkwalk -v3 -lnoAuthNoPriv -u usr-none-none -Cn0 -Cr50 demo.snmplabs.com SNMPv2-MIB::system
from pysnmp.hlapi import *

for (errorIndication,
     errorStatus,
     errorIndex,
     varBinds) in bulkCmd(SnmpEngine(),
                          UsmUserData('usr-none-none'),
                          UdpTransportTarget(('demo.snmplabs.com', 161)),
                          ContextData(),
                          0, 50,
                          ObjectType(ObjectIdentity('SNMPv2-MIB', 'system')),
                          maxCalls=10):

    if errorIndication:
        print(errorIndication)
        break
    elif errorStatus:
        print('%s at %s' % (errorStatus.prettyPrint(),
                            errorIndex and varBinds[int(errorIndex) - 1][0] or '?'))
        break
    else:
        for varBind in varBinds:
            print(' = '.join([x.prettyPrint() for x in varBind]))

Download script.

Sequence Of GET’s

Send two SNMP GET requests in a row using the following options:

  • with SNMPv3, user ‘usr-md5-none’, MD5 authentication, no privacy
  • over IPv4/UDP
  • to an Agent at demo.snmplabs.com:161
  • for IF-MIB::ifInOctets.1 and IF-MIB::ifOutOctets.1 MIB objects

Use a queue of MIB objects to query.

The next() call is used to forward Python iterator to the position where it could consume input

Functionally similar to:

$ snmpget -v3 -l authNoPriv -u usr-md5-none -A authkey1 demo.snmplabs.com IF-MIB::ifInOctets.1
from pysnmp.hlapi import *

queue = [[ObjectType(ObjectIdentity('IF-MIB', 'ifInOctets', 1))],
         [ObjectType(ObjectIdentity('IF-MIB', 'ifOutOctets', 1))]]

iter = getCmd(SnmpEngine(),
              UsmUserData('usr-md5-none', 'authkey1'),
              UdpTransportTarget(('demo.snmplabs.com', 161)),
              ContextData())

next(iter)

while queue:
    errorIndication, errorStatus, errorIndex, varBinds = iter.send(queue.pop())
    if errorIndication:
        print(errorIndication)
    elif errorStatus:
        print('%s at %s' % (errorStatus.prettyPrint(),
                            errorIndex and varBinds[int(errorIndex) - 1][0] or '?'))
    else:
        for varBind in varBinds:
            print(' = '.join([x.prettyPrint() for x in varBind]))

Download script.

Custom ContextEngineId

Send SNMP GET request using the following options:

  • with SNMPv3 with user ‘usr-md5-des’, MD5 auth and DES privacy protocols
  • use remote SNMP Engine ID 0x80004fb805636c6f75644dab22cc (USM autodiscovery will run)
  • over IPv4/UDP
  • to an Agent at demo.snmplabs.com:161
  • setting SNMPv2-MIB::sysName.0 to new value (type coerced from MIB)

Functionally similar to:

$ snmpset -v3 -u usr-md5-des -l authPriv -A authkey1 -X privkey1 -E 80004fb805636c6f75644dab22cc demo.snmplabs.com SNMPv2-MIB::sysORDescr.1 = “new system name”
from pysnmp.hlapi import *

errorIndication, errorStatus, errorIndex, varBinds = next(
    setCmd(SnmpEngine(),
           UsmUserData('usr-md5-des', 'authkey1', 'privkey1',
                       securityEngineId=OctetString(hexValue='80004fb805636c6f75644dab22cc')),
           UdpTransportTarget(('demo.snmplabs.com', 161)),
           ContextData(),
           ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysName', 0), 'new system name'))
)

if errorIndication:
    print(errorIndication)
elif errorStatus:
    print('%s at %s' % (errorStatus.prettyPrint(),
                        errorIndex and varBinds[int(errorIndex) - 1][0] or '?'))
else:
    for varBind in varBinds:
        print(' = '.join([x.prettyPrint() for x in varBind]))

Download script.

Custom ContextEngineId and ContextName

Send SNMP SET request using the following options:

  • with SNMPv3 with user ‘usr-md5-none’, MD5 auth and no privacy protocols
  • over IPv4/UDP
  • to an Agent at demo.snmplabs.com:161
  • addressing particular set of Managed Objects at remote SNMP Engine by: * contextEngineId 0x80004fb805636c6f75644dab22cc and * contextName ‘a172334d7d97871b72241397f713fa12’
  • setting SNMPv2-MIB::sysName.0 to new value (type taken from MIB)

Functionally similar to:

$ snmpset -v3 -u usr-md5-des -l authPriv -A authkey1 -X privkey1 -E 80004fb805636c6f75644dab22cc -n a172334d7d97871b72241397f713fa12 demo.snmplabs.com SNMPv2-MIB::sysORDescr.1 = “new system name”
from pysnmp.hlapi import *

errorIndication, errorStatus, errorIndex, varBinds = next(
    setCmd(SnmpEngine(),
           UsmUserData('usr-md5-none', 'authkey1'),
           UdpTransportTarget(('demo.snmplabs.com', 161)),
           ContextData(contextEngineId=OctetString(hexValue='80004fb805636c6f75644dab22cc'),
                       contextName='da761cfc8c94d3aceef4f60f049105ba'),
           ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysORDescr', 1), 'new system name'))
)

if errorIndication:
    print(errorIndication)
elif errorStatus:
    print('%s at %s' % (errorStatus.prettyPrint(),
                        errorIndex and varBinds[int(errorIndex) - 1][0] or '?'))
else:
    for varBind in varBinds:
        print(' = '.join([x.prettyPrint() for x in varBind]))

Download script.

Custom SecurityName

Send SNMP GET request using the following options:

  • with SNMPv3, user ‘usr-md5-none’, securityName ‘myuser’ MD5 authentication, no privacy
  • over IPv4/UDP
  • to an Agent at demo.snmplabs.com:161
  • for an OID in text form

The securityName parameter can be thought as an alias to userName and allows you to address a USM Table row just as userName does. However securityName can be made human-readable, also it is not an index in usmUserTable, thus duplicate securityName parameters are possible.

from pysnmp.hlapi import *

errorIndication, errorStatus, errorIndex, varBinds = next(
    getCmd(SnmpEngine(),
           UsmUserData('usr-md5-none', 'authkey1', securityName='myuser'),
           UdpTransportTarget(('demo.snmplabs.com', 161)),
           ContextData(),
           ObjectType(ObjectIdentity('1.3.6.1.2.1.1.1.0')))
)

if errorIndication:
    print(errorIndication)
elif errorStatus:
    print('%s at %s' % (errorStatus.prettyPrint(),
                        errorIndex and varBinds[int(errorIndex) - 1][0] or '?'))
else:
    for varBind in varBinds:
        print(' = '.join([x.prettyPrint() for x in varBind]))

Download script.

Discover SNMPv3 SecurityEngineId

Send SNMP GET request using the following scenario and options:

  • try to communicate with a SNMPv3 Engine using:

  • a non-existing user

  • over IPv4/UDP

  • to an Agent at demo.snmplabs.com:161

  • if remote SNMP Engine ID is discovered, send SNMP GET request:

  • with SNMPv3, user ‘usr-md5-none’, MD5 authentication, no privacy

    at discovered securityEngineId

  • to the same SNMP Engine ID

  • for an OID in text form

from pysnmp.hlapi import *

snmpEngine = SnmpEngine()

transportTarget = UdpTransportTarget(('demo.snmplabs.com', 161))

#
# To discover remote SNMP EngineID we will tap on SNMP engine inner workings
# by setting up execution point observer setup on INTERNAL class PDU processing
#

observerContext = {}

# Register a callback to be invoked at specified execution point of 
# SNMP Engine and passed local variables at execution point's local scope
snmpEngine.observer.registerObserver(
    lambda e, p, v, c: c.update(securityEngineId=v['securityEngineId']),
    'rfc3412.prepareDataElements:internal',
    cbCtx=observerContext
)

# Send probe SNMP request with invalid credentials

authData = UsmUserData('non-existing-user')

errorIndication, errorStatus, errorIndex, varBinds = next(
    getCmd(snmpEngine, authData, transportTarget, ContextData(),
           ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysDescr', 0)))
)

# See if our SNMP engine received REPORT PDU containing securityEngineId

if 'securityEngineId' not in observerContext:
    print("Can't discover peer EngineID, errorIndication: %s" % errorIndication)
    raise Exception()

securityEngineId = observerContext.pop('securityEngineId')

print('Remote securityEngineId = %s' % securityEngineId.prettyPrint())

#
# Query remote SNMP Engine using usmUserTable entry configured for it
#

authData = UsmUserData('usr-md5-none', 'authkey1',
                       securityEngineId=securityEngineId)

errorIndication, errorStatus, errorIndex, varBinds = next(
    getCmd(snmpEngine,
           authData,
           transportTarget,
           ContextData(),
           ObjectType(ObjectIdentity('1.3.6.1.2.1.1.1.0')))
)

if errorIndication:
    print(errorIndication)
elif errorStatus:
    print('%s at %s' % (errorStatus.prettyPrint(),
                        errorIndex and varBinds[int(errorIndex) - 1][0] or '?'))
else:
    for name, val in varBinds:
        print('%s = %s' % (name.prettyPrint(), val.prettyPrint()))

Download script.

Query Agents from multiple threads

Send a bunch of SNMP GET requests simultaneously using the following options:

  • process 5 GET requests in 3 parallel threads
  • with SNMPv1, community ‘public’ and with SNMPv2c, community ‘public’ and with SNMPv3, user ‘usr-md5-des’, MD5 auth and DES privacy
  • over IPv4/UDP and over IPv6/UDP
  • to an Agent at demo.snmplabs.com:161 and to an Agent at [::1]:161
  • for instances of SNMPv2-MIB::sysDescr.0 and SNMPv2-MIB::sysLocation.0 MIB objects
from sys import version_info
from threading import Thread
from pysnmp.hlapi import *

if version_info[0] == 2:
    from Queue import Queue
else:
    from queue import Queue

# List of targets in the followin format:
# ( ( authData, transportTarget, varNames ), ... )
targets = (
    # 1-st target (SNMPv1 over IPv4/UDP)
    (CommunityData('public', mpModel=0),
     UdpTransportTarget(('demo.snmplabs.com', 161)),
     (ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysDescr', 0)),
      ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysLocation', 0)))),
    # 2-nd target (SNMPv2c over IPv4/UDP)
    (CommunityData('public'),
     UdpTransportTarget(('demo.snmplabs.com', 161)),
     (ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysDescr', 0)),
      ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysLocation', 0)))),
    # 3-nd target (SNMPv2c over IPv4/UDP) - same community and 
    # different transport address.
    (CommunityData('public'),
     UdpTransportTarget(('localhost', 161)),
     (ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysContact', 0)),
      ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysName', 0)))),
    # 4-nd target (SNMPv3 over IPv4/UDP)
    (UsmUserData('usr-md5-des', 'authkey1', 'privkey1'),
     UdpTransportTarget(('demo.snmplabs.com', 161)),
     (ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysDescr', 0)),
      ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysLocation', 0)))),
    # 5-th target (SNMPv3 over IPv6/UDP)
    (UsmUserData('usr-md5-none', 'authkey1'),
     Udp6TransportTarget(('::1', 161)),
     (ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysDescr', 0)),
      ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysLocation', 0)))),
    # N-th target
    # ...
)


class Worker(Thread):
    def __init__(self, requests, responses):
        Thread.__init__(self)
        self.snmpEngine = SnmpEngine()
        self.requests = requests
        self.responses = responses
        self.setDaemon(True)
        self.start()

    def run(self):
        while True:
            authData, transportTarget, varBinds = self.requests.get()
            self.responses.append(
                next(getCmd(self.snmpEngine,
                     authData, transportTarget, ContextData(), *varBinds))
            )
            if hasattr(self.requests, 'task_done'):  # 2.5+
                self.requests.task_done()


class ThreadPool(object):
    def __init__(self, num_threads):
        self.requests = Queue(num_threads)
        self.responses = []
        for _ in range(num_threads):
            Worker(self.requests, self.responses)

    def addRequest(self, authData, transportTarget, varBinds):
        self.requests.put((authData, transportTarget, varBinds))

    def getResponses(self):
        return self.responses

    def waitCompletion(self):
        if hasattr(self.requests, 'join'):
            self.requests.join()  # 2.5+
        else:
            from time import sleep
            # this is a lame substitute for missing .join()
            # adding an explicit synchronization might be a better solution
            while not self.requests.empty():
                sleep(1)


pool = ThreadPool(3)

# Submit GET requests
for authData, transportTarget, varBinds in targets:
    pool.addRequest(authData, transportTarget, varBinds)

# Wait for responses or errors
pool.waitCompletion()

# Walk through responses
for errorIndication, errorStatus, errorIndex, varBinds in pool.getResponses():
    if errorIndication:
        print(errorIndication)
    elif errorStatus:
        print('%s at %s' % (errorStatus.prettyPrint(),
                            errorIndex and varBinds[int(errorIndex) - 1][0] or '?'))
    else:
        for varBind in varBinds:
            print(' = '.join([x.prettyPrint() for x in varBind]))

Download script.

See also: library reference.