Mentions légales du service

Skip to content
Snippets Groups Projects
Unverified Commit e575775a authored by Kirk Byers's avatar Kirk Byers Committed by GitHub
Browse files

Merge pull request #737 from ktbyers/develop

Netmiko release 2.1.0
parents 2f60d4d5 729798ce
No related branches found
No related tags found
No related merge requests found
Showing
with 311 additions and 380 deletions
cisco_881 = {
'device_type': 'cisco_ios',
'ip': '10.10.10.227',
'username': 'test1',
'password': 'password',
'secret': 'secret',
'verbose': False,
}
cisco_asa = {
'device_type': 'cisco_asa',
'ip': '10.10.10.226',
'username': 'admin',
'password': 'password',
'secret': 'secret',
'verbose': False,
}
arista_veos_sw1 = {
'device_type': 'arista_eos',
'ip': '10.10.10.227',
'username': 'admin1',
'password': 'password',
'secret': '',
'port': 8222,
'verbose': False,
}
arista_veos_sw2 = {
'device_type': 'arista_eos',
'ip': '10.10.10.227',
'username': 'admin1',
'password': 'password',
'secret': '',
'port': 8322,
'verbose': False,
}
arista_veos_sw3 = {
'device_type': 'arista_eos',
'ip': '10.10.10.227',
'username': 'admin1',
'password': 'password',
'secret': '',
'port': 8422,
'verbose': False,
}
arista_veos_sw4 = {
'device_type': 'arista_eos',
'ip': '10.10.10.227',
'username': 'admin1',
'password': 'password',
'secret': '',
'port': 8522,
'verbose': False,
}
hp_procurve = {
'device_type': 'hp_procurve',
'ip': '10.10.10.227',
'username': 'admin',
'password': 'password',
'secret': '',
'port': 9922,
'verbose': False,
}
hp_comware = {
'device_type': 'hp_comware',
'ip': '192.168.112.11',
'username': 'admin',
'password': 'admin',
'port': 22,
'verbose': False,
}
brocade_vdx = {
'device_type': 'brocade_vdx',
'ip': '10.254.8.8',
'username': 'admin',
'password': 'password',
'port': 22,
'verbose': False,
}
all_devices = [
cisco_881,
cisco_asa,
arista_veos_sw1,
arista_veos_sw2,
arista_veos_sw3,
arista_veos_sw4,
hp_procurve,
hp_comware,
brocade_vdx,
]
#!/usr/bin/env python
from __future__ import print_function, unicode_literals
# Netmiko is the same as ConnectHandler
from netmiko import Netmiko
from getpass import getpass
my_device = {
'host': "host.domain.com",
'username': 'pyclass',
'password': getpass(),
'device_type': 'cisco_ios',
# Increase (essentially) all sleeps by a factor of 2
'global_delay_factor': 2,
}
net_connect = Netmiko(**my_device)
# Increase the sleeps for just send_command by a factor of 2
output = net_connect.send_command("show ip int brief", delay_factor=2)
print(output)
net_connect.disconnect()
#!/usr/bin/env python
"""Script to upgrade a Cisco ASA."""
import sys
from __future__ import print_function
from datetime import datetime
from getpass import getpass
from netmiko import ConnectHandler, FileTransfer
def asa_scp_handler(ssh_conn, cmd='ssh scopy enable', mode='enable'):
"""Enable/disable SCP on Cisco ASA."""
if mode == 'disable':
cmd = 'no ' + cmd
return ssh_conn.send_config_set([cmd])
def main():
"""Script to upgrade a Cisco ASA."""
ip_addr = raw_input("Enter ASA IP address: ")
try:
ip_addr = raw_input("Enter ASA IP address: ")
except NameError:
ip_addr = input("Enter ASA IP address: ")
my_pass = getpass()
start_time = datetime.now()
print ">>>> {}".format(start_time)
print(">>>> {}".format(start_time))
net_device = {
'device_type': 'cisco_asa',
......@@ -27,16 +32,15 @@ def main():
'port': 22,
}
print "\nLogging in to ASA"
print("\nLogging in to ASA")
ssh_conn = ConnectHandler(**net_device)
print
print()
# ADJUST TO TRANSFER IMAGE FILE
dest_file_system = 'disk0:'
source_file = 'test1.txt'
dest_file = 'test1.txt'
alt_dest_file = 'asa825-59-k8.bin'
scp_changed = False
with FileTransfer(ssh_conn, source_file=source_file, dest_file=dest_file,
file_system=dest_file_system) as scp_transfer:
......@@ -45,42 +49,43 @@ def main():
if not scp_transfer.verify_space_available():
raise ValueError("Insufficient space available on remote device")
print "Enabling SCP"
print("Enabling SCP")
output = asa_scp_handler(ssh_conn, mode='enable')
print output
print(output)
print "\nTransferring file\n"
print("\nTransferring file\n")
scp_transfer.transfer_file()
print "Disabling SCP"
print("Disabling SCP")
output = asa_scp_handler(ssh_conn, mode='disable')
print output
print(output)
print "\nVerifying file"
print("\nVerifying file")
if scp_transfer.verify_file():
print "Source and destination MD5 matches"
print("Source and destination MD5 matches")
else:
raise ValueError("MD5 failure between source and destination files")
print "\nSending boot commands"
print("\nSending boot commands")
full_file_name = "{}/{}".format(dest_file_system, alt_dest_file)
boot_cmd = 'boot system {}'.format(full_file_name)
output = ssh_conn.send_config_set([boot_cmd])
print output
print(output)
print "\nVerifying state"
print("\nVerifying state")
output = ssh_conn.send_command('show boot')
print output
print(output)
# UNCOMMENT TO PERFORM WR MEM AND RELOAD
#print "\nWrite mem and reload"
#output = ssh_conn.send_command_expect('write mem')
#output += ssh_conn.send_command('reload')
#output += ssh_conn.send_command('y')
#print output
print "\n>>>> {}".format(datetime.now() - start_time)
print
# print("\nWrite mem and reload")
# output = ssh_conn.send_command_expect('write mem')
# output += ssh_conn.send_command('reload')
# output += ssh_conn.send_command('y')
# print(output)
print("\n>>>> {}".format(datetime.now() - start_time))
print()
if __name__ == "__main__":
main()
#!/usr/bin/env python
'''
Test Netmiko on Cisco TelePresence Video device (C, SX, DX, EX, MX)
'''
from netmiko import ConnectHandler, cisco
mydevice = {
'device_type': 'cisco_tp',
'ip': '192.168.105.1',
'username': 'admin',
'password': 'Tandberg',
'verbose':True
}
ssh_conn = ConnectHandler(**mydevice)
print( "\n\n")
command_list = ['help', 'whoami', 'whoaami', 'echo test', 'xconfig']
#command_list = ['help', 'whoami', 'whoaami', 'echo test']
ssh_conn = ConnectHandler(**mydevice)
print( "\n\n")
for command in command_list:
print('>>> running command : ' + command)
output = ssh_conn.send_command(command)
print('Result = ' + output + '\n')
\ No newline at end of file
logging buffered 8000
logging console
#!/usr/bin/env python
from __future__ import print_function, unicode_literals
# Netmiko is the same as ConnectHandler
from netmiko import Netmiko
from getpass import getpass
my_device = {
'host': "host.domain.com",
'username': 'pyclass',
'password': getpass(),
'device_type': 'cisco_ios',
}
net_connect = Netmiko(**my_device)
cfg_commands = ['logging buffered 10000', 'no logging console']
# send_config_set() will automatically enter/exit config mode
output = net_connect.send_config_set(cfg_commands)
print(output)
net_connect.disconnect()
#!/usr/bin/env python
from __future__ import print_function, unicode_literals
# Netmiko is the same as ConnectHandler
from netmiko import Netmiko
from getpass import getpass
my_device = {
'host': "host.domain.com",
'username': 'pyclass',
'password': getpass(),
'device_type': 'cisco_ios',
}
net_connect = Netmiko(**my_device)
# Make configuration changes using an external file
output = net_connect.send_config_from_file("change_file.txt")
print(output)
net_connect.disconnect()
#!/usr/bin/env python
"""
This example is serial (i.e. no concurrency). Connect to one device, after the other,
after the other.
"""
from __future__ import print_function, unicode_literals
# Netmiko is the same as ConnectHandler
from netmiko import Netmiko
from getpass import getpass
password = getpass()
cisco1 = {
'host': "host1.domain.com",
'username': 'pyclass',
'password': password,
'device_type': 'cisco_ios',
}
arista1 = {
'host': "host2.domain.com",
'username': 'pyclass',
'password': password,
'device_type': 'arista_eos',
}
srx1 = {
'host': "host3.domain.com",
'username': 'pyclass',
'password': password,
'device_type': 'juniper_junos',
}
for device in (cisco1, arista1, srx1):
net_connect = Netmiko(**device)
print(net_connect.find_prompt())
#!/usr/bin/env python
from __future__ import print_function, unicode_literals
# Netmiko is the same as ConnectHandler
from netmiko import Netmiko
from getpass import getpass
my_device = {
'host': "host.domain.com",
'username': 'pyclass',
'password': getpass(),
'device_type': 'cisco_ios',
}
net_connect = Netmiko(**my_device)
# Ensure in enable mode
net_connect.enable()
print(net_connect.find_prompt())
net_connect.disconnect()
#!/usr/bin/env python
"""Handling commands that prompt for additional information."""
from __future__ import print_function, unicode_literals
# Netmiko is the same as ConnectHandler
from netmiko import Netmiko
from getpass import getpass
my_device = {
'host': "host.domain.com",
'username': 'pyclass',
'password': getpass(),
'device_type': 'cisco_ios',
}
"""
Cisco IOS behavior on file delete:
pynet-rtr1# delete flash:/small_file_bim.txt
Delete flash:/test1.txt? [confirm]y
pynet-rtr1
"""
net_connect = Netmiko(**my_device)
filename = "text1234.txt"
cmd = "delete flash:{}".format(filename)
# send_command_timing as the router prompt is not returned
output = net_connect.send_command_timing(cmd, strip_command=False, strip_prompt=False)
if 'confirm' in output:
output += net_connect.send_command_timing("\n", strip_command=False, strip_prompt=False)
net_connect.disconnect()
print(output)
'''
Requires paramiko >=1.8.0 (paramiko had an issue with multiprocessing prior
to this)
Example code showing how to use netmiko for multiprocessing. Create a
separate process for each ssh connection. Each subprocess executes a
'show version' command on the remote device. Use a multiprocessing.queue to
pass data from subprocess to parent process.
Only supports Python2
'''
# Catch Paramiko warnings about libgmp and RandomPool
import warnings
with warnings.catch_warnings(record=True) as w:
import paramiko
import multiprocessing
import time
from datetime import datetime
import netmiko
from netmiko.ssh_exception import NetMikoTimeoutException, NetMikoAuthenticationException
# DEVICE_CREDS contains the devices to connect to
from DEVICE_CREDS import all_devices
def print_output(results):
print "\nSuccessful devices:"
for a_dict in results:
for identifier,v in a_dict.iteritems():
(success, out_string) = v
if success:
print '\n\n'
print '#' * 80
print 'Device = {0}\n'.format(identifier)
print out_string
print '#' * 80
print "\n\nFailed devices:\n"
for a_dict in results:
for identifier,v in a_dict.iteritems():
(success, out_string) = v
if not success:
print 'Device failed = {0}'.format(identifier)
print "\nEnd time: " + str(datetime.now())
print
def worker_show_version(a_device, mp_queue):
'''
Return a dictionary where the key is the device identifier
Value is (success|fail(boolean), return_string)
'''
try:
a_device['port']
except KeyError:
a_device['port'] = 22
identifier = '{ip}:{port}'.format(**a_device)
return_data = {}
show_ver_command = 'show version'
SSHClass = netmiko.ssh_dispatcher(a_device['device_type'])
try:
net_connect = SSHClass(**a_device)
show_version = net_connect.send_command(show_ver_command)
except (NetMikoTimeoutException, NetMikoAuthenticationException) as e:
return_data[identifier] = (False, e)
# Add data to the queue (for parent process)
mp_queue.put(return_data)
return None
return_data[identifier] = (True, show_version)
mp_queue.put(return_data)
def main():
mp_queue = multiprocessing.Queue()
processes = []
print "\nStart time: " + str(datetime.now())
for a_device in all_devices:
p = multiprocessing.Process(target=worker_show_version, args=(a_device, mp_queue))
processes.append(p)
# start the work process
p.start()
# retrieve all the data from the queue
results = []
while any(p.is_alive() for p in processes):
time.sleep(0.1)
while not mp_queue.empty():
results.append(mp_queue.get())
# wait until the child processes have completed
for p in processes:
p.join()
print_output(results)
if __name__ == '__main__':
main()
#!/usr/bin/env python
'''
Cisco IOS only
Requires scp https://github.com/jbardin/scp.py
'''
from netmiko import ConnectHandler, SCPConn
from SECRET_DEVICE_CREDS import cisco_881
def main():
'''
SCP transfer cisco_logging.txt to network device
Use ssh_conn as ssh channel into network device
scp_conn must be closed after file transfer
'''
ssh_conn = ConnectHandler(**cisco_881)
scp_conn = SCPConn(ssh_conn)
s_file = 'cisco_logging.txt'
d_file = 'cisco_logging.txt'
print "\n\n"
scp_conn.scp_transfer_file(s_file, d_file)
scp_conn.close()
output = ssh_conn.send_command("show flash: | inc cisco_logging")
print ">> " + output + '\n'
# Disable file copy confirmation
output = ssh_conn.send_config_set(["file prompt quiet"])
# Execute config merge
print "Performing config merge\n"
output = ssh_conn.send_command("copy flash:cisco_logging.txt running-config")
# Verify change
print "Verifying logging buffer change"
output = ssh_conn.send_command("show run | inc logging buffer")
print ">> " + output + '\n'
# Restore copy confirmation
output = ssh_conn.send_config_set(["file prompt alert"])
if __name__ == "__main__":
main()
#!/usr/bin/env python
from __future__ import print_function, unicode_literals
# Netmiko is the same as ConnectHandler
from netmiko import Netmiko
from getpass import getpass
my_device = {
'host': 'host.domain.com',
'username': 'pyclass',
'password': getpass(),
'device_type': 'cisco_ios',
}
net_connect = Netmiko(**my_device)
output = net_connect.send_command("show ip int brief")
print(output)
net_connect.disconnect()
#!/usr/bin/env python
from __future__ import print_function, unicode_literals
# Netmiko is the same as ConnectHandler
from netmiko import Netmiko
from getpass import getpass
my_device = {
'host': 'host.domain.com',
'username': 'pyclass',
'password': getpass(),
'device_type': 'cisco_ios',
}
net_connect = Netmiko(**my_device)
# Requires ntc-templates to be installed in ~/ntc-templates/templates
output = net_connect.send_command("show ip int brief", use_textfsm=True)
print(output)
#!/usr/bin/env python
from __future__ import print_function, unicode_literals
# Netmiko is the same as ConnectHandler
from netmiko import Netmiko
from getpass import getpass
net_connect = Netmiko(host='host.domain.com', username='pyclass',
password=getpass(), device_type='cisco_ios')
print(net_connect.find_prompt())
net_connect.disconnect()
#!/usr/bin/env python
from __future__ import print_function, unicode_literals
# Netmiko is the same as ConnectHandler
from netmiko import Netmiko
from getpass import getpass
my_device = {
'host': 'host.domain.com',
'username': 'pyclass',
'password': getpass(),
'device_type': 'cisco_ios',
}
net_connect = Netmiko(**my_device)
print(net_connect.find_prompt())
net_connect.disconnect()
#!/usr/bin/env python
from __future__ import print_function, unicode_literals
import logging
from netmiko import Netmiko
from getpass import getpass
# This will create a file named 'test.log' in your current directory.
# It will log all reads and writes on the SSH channel.
logging.basicConfig(filename='test.log', level=logging.DEBUG)
logger = logging.getLogger("netmiko")
my_device = {
'host': 'host.domain.com',
'username': 'pyclass',
'password': getpass(),
'device_type': 'cisco_ios',
}
net_connect = Netmiko(**my_device)
output = net_connect.send_command("show ip int brief")
print(output)
net_connect.disconnect()
......@@ -16,17 +16,18 @@ from netmiko.ssh_exception import NetMikoTimeoutException
from netmiko.ssh_exception import NetMikoAuthenticationException
from netmiko.ssh_autodetect import SSHDetect
from netmiko.base_connection import BaseConnection
from netmiko.scp_functions import file_transfer
# Alternate naming
NetmikoTimeoutError = NetMikoTimeoutException
NetmikoAuthError = NetMikoAuthenticationException
Netmiko = ConnectHandler
__version__ = '2.0.2'
__version__ = '2.1.0'
__all__ = ('ConnectHandler', 'ssh_dispatcher', 'platforms', 'SCPConn', 'FileTransfer',
'NetMikoTimeoutException', 'NetMikoAuthenticationException',
'NetmikoTimeoutError', 'NetmikoAuthError', 'InLineTransfer', 'redispatch',
'SSHDetect', 'BaseConnection', 'Netmiko')
'SSHDetect', 'BaseConnection', 'Netmiko', 'file_transfer')
# Cisco cntl-shift-six sequence
CNTL_SHIFT_6 = chr(30)
from __future__ import unicode_literals
import time
import re
from netmiko.cisco_base_connection import CiscoSSHConnection
from netmiko.cisco_base_connection import CiscoFileTransfer
from netmiko import log
......@@ -35,43 +34,27 @@ class AristaSSH(CiscoSSHConnection):
log.debug("check_config_mode: {0}".format(repr(output)))
return check_string in output
def _enter_shell(self):
"""Enter the Bourne Shell."""
return self.send_command('bash', expect_string=r"[\$#]")
def _return_cli(self):
"""Return to the CLI."""
return self.send_command('exit', expect_string=r"[#>]")
class AristaFileTransfer(CiscoFileTransfer):
"""Arista SCP File Transfer driver."""
def __init__(self, ssh_conn, source_file, dest_file, file_system=None, direction='put'):
msg = "Arista SCP Driver is under development and not fully implemented"
raise NotImplementedError(msg)
self.ssh_ctl_chan = ssh_conn
self.source_file = source_file
self.dest_file = dest_file
self.direction = direction
if file_system:
self.file_system = file_system
else:
raise ValueError("Destination file system must be specified for Arista")
# if direction == 'put':
# self.source_md5 = self.file_md5(source_file)
# self.file_size = os.stat(source_file).st_size
# elif direction == 'get':
# self.source_md5 = self.remote_md5(remote_file=source_file)
# self.file_size = self.remote_file_size(remote_file=source_file)
# else:
# raise ValueError("Invalid direction specified")
def put_file(self):
"""SCP copy the file from the local system to the remote device."""
destination = "{}/{}".format(self.file_system, self.dest_file)
self.scp_conn.scp_transfer_file(self.source_file, destination)
# Must close the SCP connection to get the file written (flush)
self.scp_conn.close()
def remote_space_available(self, search_pattern=r"(\d+) bytes free"):
def __init__(self, ssh_conn, source_file, dest_file, file_system="/mnt/flash", direction='put'):
return super(AristaFileTransfer, self).__init__(ssh_conn=ssh_conn,
source_file=source_file,
dest_file=dest_file,
file_system=file_system,
direction=direction)
def remote_space_available(self, search_pattern=""):
"""Return space available on remote device."""
return super(AristaFileTransfer, self).remote_space_available(
search_pattern=search_pattern
)
return self._remote_space_available_unix(search_pattern=search_pattern)
def verify_space_available(self, search_pattern=r"(\d+) bytes free"):
"""Verify sufficient space is available on destination file system (return boolean)."""
......@@ -81,45 +64,22 @@ class AristaFileTransfer(CiscoFileTransfer):
def check_file_exists(self, remote_cmd=""):
"""Check if the dest_file already exists on the file system (return boolean)."""
raise NotImplementedError
return self._check_file_exists_unix(remote_cmd=remote_cmd)
def remote_file_size(self, remote_cmd="", remote_file=None):
"""Get the file size of the remote file."""
if remote_file is None:
if self.direction == 'put':
remote_file = self.dest_file
elif self.direction == 'get':
remote_file = self.source_file
if not remote_cmd:
remote_cmd = "dir {}/{}".format(self.file_system, remote_file)
remote_out = self.ssh_ctl_chan.send_command(remote_cmd)
# Match line containing file name
escape_file_name = re.escape(remote_file)
pattern = r".*({}).*".format(escape_file_name)
match = re.search(pattern, remote_out)
if match:
file_size = match.group(0)
file_size = file_size.split()[0]
if 'No such file or directory' in remote_out:
raise IOError("Unable to find file on remote system")
else:
return int(file_size)
@staticmethod
def process_md5(md5_output, pattern=r"= (.*)"):
raise NotImplementedError
return self._remote_file_size_unix(remote_cmd=remote_cmd, remote_file=remote_file)
def remote_md5(self, base_cmd='show file', remote_file=None):
def remote_md5(self, base_cmd='verify /md5', remote_file=None):
if remote_file is None:
if self.direction == 'put':
remote_file = self.dest_file
elif self.direction == 'get':
remote_file = self.source_file
remote_md5_cmd = "{} {}{} md5sum".format(base_cmd, self.file_system, remote_file)
return self.ssh_ctl_chan.send_command(remote_md5_cmd, delay_factor=3.0)
remote_md5_cmd = "{} file:{}/{}".format(base_cmd, self.file_system, remote_file)
dest_md5 = self.ssh_ctl_chan.send_command(remote_md5_cmd, max_loops=750, delay_factor=2)
dest_md5 = self.process_md5(dest_md5)
return dest_md5
def enable_scp(self, cmd=None):
raise NotImplementedError
......
......@@ -283,7 +283,7 @@ class BaseConnection(object):
# Try sending IAC + NOP (IAC is telnet way of sending command
# IAC = Interpret as Command (it comes before the NOP)
log.debug("Sending IAC + NOP")
self.device.write_channel(telnetlib.IAC + telnetlib.NOP)
self.write_channel(telnetlib.IAC + telnetlib.NOP)
return True
except AttributeError:
return False
......@@ -367,7 +367,7 @@ class BaseConnection(object):
loop_delay = .1
# Default to making loop time be roughly equivalent to self.timeout (support old max_loops
# argument for backwards compatibility).
if max_loops != 150:
if max_loops == 150:
max_loops = self.timeout / loop_delay
while i < max_loops:
if self.protocol == 'ssh':
......@@ -1062,7 +1062,11 @@ class BaseConnection(object):
def check_config_mode(self, check_string='', pattern=''):
"""Checks if the device is in configuration mode or not."""
self.write_channel(self.RETURN)
output = self.read_until_pattern(pattern=pattern)
# You can encounter an issue here (on router name changes) prefer delay-based solution
if not pattern:
output = self._read_channel_timing()
else:
output = self.read_until_pattern(pattern=pattern)
return check_string in output
def config_mode(self, config_command='', pattern=''):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment