Python-Flask:编写自动化连接demo脚本:v1.0.0
作者:小教学发布时间:2023-10-01分类:程序开发学习浏览:65
导读:主函数:#_*_Coding:UTF-8_*_#@Time:13:14#@Author:YYZ#@File:Flask#@Project...
主函数:
# _*_ Coding : UTF-8 _*_
# @Time : 13:14
# @Author : YYZ
# @File : Flask
# @Project : Python_Project_爬虫
import json
from flask import Flask,request,jsonify
import ssh
api = Flask(__name__)
# methods: 指定请求方式
'''
接口解析参数
host = host_info["host"]
port = host_info["port"]
service = host_info["service"]
user = host_info["user"]
pwd = host_info["pwd"]
'''
@api.route('/',methods=['POST'])
def install():
# 请求方式为post时,可以使用 request.get_json()接收到JSON数据
try:
#host_info = request.get_json() # 获取 POST 请求中的 JSON 数据
host_info = request.get_data()
# 如果得到的data是字符串格式,则需要用json.loads来变换成python格式,看个人需求
host_info = json.loads(host_info)
print(host_info)
except Exception as e:
return jsonify({'error': '请求数据失败'}), 400
# 处理数据
# 调用do_something_with_data函数来处理接收到的数据。
try:
connect = ssh.Sshclass(host_info["host"], host_info["user"], host_info["port"]) # 端口,用户,ssh端口
connect.conn_by_pwd(host_info["pwd"]) # 输入密码,进行登录
except Exception as e:
return jsonify({'error': '连接失败'}), 888
if host_info["cmd"] :
try:
command_res = str(connect.exec_commond(host_info["cmd"])) #执行命令
print(command_res)
except Exception as e:
return jsonify({'error': '执行失败'}), 888
if host_info["file-determine"] == "yes":
try:
file_res = connect.upload_file(host_info["local_path"],host_info["remote_path"],host_info["file_name"])
print(host_info["file_name"]+str(file_res))
except Exception as e:
return jsonify({'error': '上传失败'}), 888
return "操作完成"
if __name__ == '__main__':
api.run(host='0.0.0.0', port=8080, debug=True)
Ssh连接部分:
import paramiko
import json
'''
ssh 连接对象
本对象提供密钥连接、命令执行、关闭连接
'''
class Sshclass(object):
# ip = ''
# port = 22
# username = ''
# timeout = 0
# ssh = None
def __init__(self,ip,username,port=22,timeout=30):
'''
初始化ssh对象
:param ip: 主机IP
:param username: 登录用户名
:param port: ssh端口号
:param timeout: 连接超时
:return:
'''
self.ip = ip
self.username = username
self.timeout = timeout
self.port = port
'''
SSHClient作用类似于Linux的ssh命令,是对SSH会话的封装,该类封装了传输(Transport),通道(Channel)及SFTPClient建立的方法(open_sftp),通常用于执行远程命令。
Paramiko中的几个基础名词:
1、Channel:是一种类Socket,一种安全的SSH传输通道;
2、Transport:是一种加密的会话,使用时会同步创建了一个加密的Tunnels(通道),这个Tunnels叫做Channel;
3、Session:是client与Server保持连接的对象,用connect()/start_client()/start_server()开始会话。
'''
ssh = paramiko.SSHClient()
#远程主机没有本地主机密钥或HostKeys对象时的连接方法,需要配置
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.ssh = ssh
def conn_by_key(self,key):
'''
密钥连接
:param key: str rsa密钥路径
:return: ssh连接对象
'''
rsa_key = paramiko.RSAKey.from_private_key(key)
self.ssh.connect(hostname=self.ip,port=self.port,username=self.username,pkey=rsa_key,timeout=self.timeout)
if self.ssh:
print('密钥连接成功')
else:
self.close()
raise Exception('密钥连接失败')
def conn_by_pwd(self,pwd):
'''
密码连接
:param pwd: 登录密码
:return: ssh连接对象
'''
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.ssh.connect(hostname=self.ip,port=self.port,username=self.username,password=pwd)
if self.ssh:
print('密码连接成功')
else:
self.close()
raise Exception('密码连接失败')
def exec_commond(self,command):
'''
命令控制
:param commond: 命令
:return: 返回结构
'''
if command:
stdin, stdout, stderr = self.ssh.exec_command(command)
return {
"stdin":command,
"stdout":stdout.read(),
"stderr":stderr.read()
}
else:
self.close()
raise Exception("命令不能为空")
def close(self):
'''
关闭当前连接
:return:
'''
if self.ssh:
self.ssh.close()
else:
raise Exception("ssh关闭连接失败,当前对象没有ssh连接。")
def upload_file(self,local_path,remote_path,file_name):
# sftp_link = paramiko.Transport(self.ip,self.port)
# sftp_link.connect(username=self.username,password=pwd)
# sftp = paramiko.SFTPClient.from_transport(sftp_link)
sftp = self.ssh.open_sftp()
try:
sftp.put(local_path+"\\"+file_name, remote_path+"/"+file_name)
#print("上传成功")
return ("上传成功"), 200
except Exception as e:
return {'error': '上传失败'}, 888
finally:
sftp.close()
self.close()
if __name__ == '__main__':
ssh = Sshclass('192.168.115.23','root', port=22)
pwd = "123456"
local_path = 'D:\PyChrom\Python_Flask\自动化接口--Flask'
remote_path = "/opt"
file_name = 'file.py'
ssh.conn_by_pwd(pwd)
#res = str(ssh.exec_commond("ls /"))
#print(res)
res = ssh.upload_file(local_path,remote_path,file_name)
print(res)
接口调试
后续优化思路:
目前只是远程连接+文件上传,后续会继续优化
弄个公共的nfs,平常一些脚本和包会放到这个nfs里,脚本或包自动从nfs里拉,然后执行脚本,即可部署,包括多机部署。
- 上一篇:单调队列 - 滑动窗口
- 下一篇:区块链(6):p2p去中心化介绍
- 程序开发学习排行
-
- 1鸿蒙HarmonyOS:Web组件网页白屏检测
- 2HTTPS协议是安全传输,为啥还要再加密?
- 3HarmonyOS鸿蒙应用开发——数据持久化Preferences
- 4记解决MaterialButton背景颜色与设置值不同
- 5鸿蒙HarmonyOS实战-ArkUI组件(RelativeContainer)
- 6鸿蒙HarmonyOS实战-ArkUI组件(Stack)
- 7鸿蒙HarmonyOS实战-ArkUI组件(GridRow/GridCol)
- 8[Android][NDK][Cmake]一文搞懂Android项目中的Cmake
- 9鸿蒙HarmonyOS实战-ArkUI组件(mediaquery)
- 最近发表
-
- WooCommerce最好的WordPress常用插件下载博客插件模块的相关产品
- 羊驼机器人最好的WordPress常用插件下载博客插件模块
- IP信息记录器最好的WordPress常用插件下载博客插件模块
- Linkly for WooCommerce最好的WordPress常用插件下载博客插件模块
- 元素聚合器Forms最好的WordPress常用插件下载博客插件模块
- Promaker Chat 最好的WordPress通用插件下载 博客插件模块
- 自动更新发布日期最好的WordPress常用插件下载博客插件模块
- WordPress官方最好的获取回复WordPress常用插件下载博客插件模块
- Img to rss最好的wordpress常用插件下载博客插件模块
- WPMozo为Elementor最好的WordPress常用插件下载博客插件模块添加精简版