
远程操作、系统信息、定时任务
更新: 2025/2/24 字数: 0 字 时长: 0 分钟
远程操作
paramiko模块提供了ssh以及sft进行远程登录服务器执行命令和上传下载文件的功能。
文件下载
python
import paramiko
# 从服务器上下载文件
def ssh_down(ip, port, username, password, server, local):
"""
:param ip: 服务器IP
:param port: 端口
:param username:用户
:param password: 密码
"""
# 实例化一个transport连接对象
transport = paramiko.Transport((ip, port))
# 建立连接
transport.connect(username=username, password=password)
# 实例化一个sftp对象,指定连接的通道
sftp = paramiko.SFTPClient.from_transport(transport)
# 将历史数据压缩包下载到本地文件中
sftp.get(server, local)
# 关闭transport连接对象
transport.close()
if __name__ == '__main__':
# 服务器文件路径及名称
server = '/路径/文件名.后缀'
# 本地存储路径及名称
local = '盘符:/路径/文件名.后缀'
# 下载文件
ssh_down("服务器IP", 22, "登录的用户名", "连接密码", server, local)
代码部署
执行下面的代码的前提是,公钥已经配置在了服务器上,能通过私钥直连服务器。
python
import time
import paramiko
# 命令流(\n就是确认)
command = (
# 暂停运行的服务(作用等于Ctrl+C)
"\x03\n",
# 扔掉本地分支修改
"git checkout .\n",
# 切换到master分支
"git checkout master\n",
# 删除dev分支
"git branch -D dev\n",
# 拉取线上dev分支到本地为dev分支
"git fetch -f origin dev:dev\n",
# 切换到dev分支
"git checkout dev\n",
)
# 连接服务器执行代命令
try:
# 实例化一个SSH连接对象
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 读取私钥(注意私钥路径)
ssh.connect('服务器IP', port=22, username='root', key_filename='C:/Users/用户/.ssh/id_rsa')
# 援引shell
chan = ssh.invoke_shell()
# 遍历命令流
for com in command:
# 发送执行命令
chan.send(com)
# 暂停一段时间,等待命令响应
time.sleep(5)
# 非必须,接受返回消息
print(chan.recv(20000).decode())
# 关闭SSH连接对象
ssh.close()
except Exception as e:
print(e)
SSH链接
SSH 为 Secure Shell 的缩写,由 IETF 的网络小组(Network Working Group)所制定;SSH 为建立在应用层基础上的安全协议。SSH 是较可靠,专为远程登录会话和其他网络服务提供安全性的协议。利用 SSH 协议可以有效防止远程管理过程中的信息泄露问题。
以往我们访问数据库的方式都是直连的,也就是直接访问数据库。**但在项目当中,处于安全的考虑,很多情况下线上的数据库都不允许直接访问,大多是通过SSH链接访问。**例如:数据库在A机器,但只能通过B机器来访问,C机器想要访问A机器的数据库,就只能链接B机器再链接A机器访问数据库了。在Python中可以使用 mysqldb
模块通过ssh隧道来链接线上的MySQL数据库:
python
import pymysql
from sshtunnel import SSHTunnelForwarder
class Spider_Mail(object):
def __init__(self):
self.server = None
self.db = None
self.cursor = None
def connect_mysql(self):
try:
# 通过SSH链接到可访问数据库的主机上
self.server = SSHTunnelForwarder(
('0.0.0.0', 22), # 注释:SSH主机的IP地址和开放端口
ssh_username="username", # 注释:SSH的链接的用户名
ssh_password="password", # 注释:SSH的链接的密码
remote_bind_address=('0.0.0.0', 3306)) # 注释:数据库所在的IP和端口
self.server.start()
# 选择要连接的数据库
self.db = pymysql.connect(host="127.0.0.1",
port=3306, # 注释:数据库端口
user="user", # 注释:数据库账号
passwd="passwd", # 注释:数据库密码
db="database") # 注释:访问特定的数据库
self.cursor = self.db.cursor()
print('-'*40+'连接线上数据库成功'+'-'*40)
except Exception as e:
print(f'连接线上数据库出错:{e}')
def main(self):
self.connect_mysql()
if self.server and self.db and self.cursor:
try:
'''
这里执行数据库里面的一些操作
'''
# 关闭连接
self.db.close()
self.server.close()
print('-' * 40 + '关闭线上数据库连接' + '-' * 40)
except Exception as e:
print(f'程序出错:{e}')
假如我们的MySQL数据库在服务器上,且不能通过IP直连,就需要通过SSH链接来连接:
python
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sshtunnel import SSHTunnelForwarder
try:
ssh_mysql_server = SSHTunnelForwarder(
('服务器IP地址', 22), # 注释:服务器IP地址
ssh_username="root", # 注释:登录服务器的用户名
ssh_password="密码", # 注释:登录服务器用户名的密码
remote_bind_address=('172.19.216.186', 3306)) # 注释:再跳转到内网186服务器上的3306端口的MySQL
ssh_mysql_server.start()
temp_new_engine = create_engine(
"mysql+pymysql://root:[email protected]:3306/数据库名",
max_overflow=10,
pool_size=10, # 注释:连接池大小
pool_timeout=60, # 注释:池中没有线程最多等待的时间,否则报错
pool_recycle=7200
)
SSH_MYSQL_SESSION = sessionmaker(bind=temp_new_engine)
except Exception as e:
print(e)
系统信息
psutil是一个跨平台库,能够轻松实现获取系统运行的进程和系统利用率(包括CPU、内存、磁盘、网络等)信息,主要应用于系统监控,分析和限制系统资源及进程的管理。
CPU信息
python
# 查看cpu所有信息
psutil.cpu_times()
# 显示cpu所有逻辑信息
psutil.cpu_times(percpu=True)
# 查看用户的cpu时间比
psutil.cpu_times().user
# 查看cpu逻辑个数
psutil.cpu_count()
# 查看cpu物理个数
psutil.cpu_count(logical=False)
内存信息
python
# 系统总计内存
psutil.virtual_memory().total
# 系统已经使用内存
psutil.virtual_memory().used
# 系统空闲内存
psutil.virtual_memory().free
# 获取swap内存信息
psutil.virtual_memory().swap_memory()
磁盘信息
python
# 磁盘利用率使用
psutil.disk_usage()
# 获取磁盘的完整信息(包括)
psutil.disk_partitions()
# 获取硬盘IO总个数(包括read_count(读IO数),write_count(写IO数),read_bytes(IO写字节数),read_time(磁盘读时间),write_time(磁盘写时间))
psutil.disk_io_counters()
# 获取单个分区IO个数
psutil.disk_io_counters(perdisk=True)
# 获取分区表的参数
psutil.disk_usage('/')
网络信息
python
# 获取网络总IO信息(与磁盘IO信息类似,包括byes_sent(发送字节数),byte_recv=xxx(接受字节数),pack-ets_sent=xxx(发送字节数),pack-ets_recv=xxx(接收数据包数))
psutil.net_io_counters()
# 输出网络每个接口信息
psutil.net_io_counters(pernic=True)
# 获取当前系统用户登录信息
psutil.users()
进程信息
python
# 查看系统全部进程
psutil.pids()
# 查看单个进程
p = psutil.Process(进程号)
p.name() # 注释:进程名
p.exe() # 注释:进程的bin路径
p.cwd() # 注释:进程的工作目录绝对路径
p.status() # 注释:进程状态
p.create_time() # 注释:进程创建时间
p.uids() # 注释:进程uid信息
p.gids() # 注释:进程的gid信息
p.cpu_times() # 注释:进程的cpu时间信息,包括user、system两个cpu信息
p.cpu_affinity() # 注释:get进程cpu亲和度,如果要设置cpu亲和度,将cpu号作为参考就好
p.memory_percent() # 注释:进程内存利用率
p.memory_info() # 注释:进程内存rss,vms信息
p.io_counters() # 注释:进程的IO信息,包括读写IO数字及参数
p.connectios() # 注释:返回进程列表
p.num_threads() # 注释:进程开启的线程数
时间信息
python
# 获取开机时间(以linux时间格式返回)
psutil.boot_time()
# 获取开机时间(以自然时间格式返回)
datetime.datetime.fromtimestamp(psutil.boot_time()).strftime("%Y-%m-%d %H: %M: %S")
定时任务
schedule是一个轻量级的定时任务调度的库,它可以完成每分钟、每小时、每天、每周几、特定日期的定时任务,十分方便我们执行一些轻量级的定时任务。
快速安装
使用下面命令安装Schedule任务调度的库:
pip install schedule
设置任务
使用Schedule设置定时任务:
python
def test_job():
print('时间到了,开始执行')
# 每分钟在第17秒的时候执行任务
schedule.every().minute.at(':17').do(test_job)
# 每10分钟执行一次任务
schedule.every(10).minutes.do(test_job)
# 每小时执行一次任务
schedule.every().hour.do(test_job)
# 每天在10:30时间点执行一次任务
schedule.every().day.at('10:30').do(test_job)
# 每周一执行一次任务
schedule.every().monday.do(test_job)
# 每周一的9:30执行一次任务
schedule.every().monday.at('9:30').do(test_job)
# 在程序最后,设置一个循环,每分钟检查一下任务执行状态
while True:
schedule.run_pending()
time.sleep(60)