Skip to content

远程操作、系统信息、定时任务

更新: 2025/2/24 字数: 0 字 时长: 0 分钟

远程操作

paramiko模块提供了ssh以及sft进行远程登录服务器执行命令和上传下载文件的功能。

文件下载

python
import paramiko

# 从服务器上下载文件
def ssh_down(ip, port, username, password, server, local):
    """
    :param ip: 服务器IP
    :param port: 端口
    :param username:用户
    :param password: 密码
    """
    # 实例化一个transport连接对象
    transport = paramiko.Transport((ip, port))
    # 建立连接
    transport.connect(username=username, password=password)
    # 实例化一个sftp对象,指定连接的通道
    sftp = paramiko.SFTPClient.from_transport(transport)
    # 将历史数据压缩包下载到本地文件中
    sftp.get(server, local)
    # 关闭transport连接对象
    transport.close()

if __name__ == '__main__':
    # 服务器文件路径及名称
    server = '/路径/文件名.后缀'
    # 本地存储路径及名称
    local = '盘符:/路径/文件名.后缀'
    # 下载文件
    ssh_down("服务器IP", 22, "登录的用户名", "连接密码", server, local)

代码部署

执行下面的代码的前提是,公钥已经配置在了服务器上,能通过私钥直连服务器。

python
import time
import paramiko

# 命令流(\n就是确认)
command = (
    # 暂停运行的服务(作用等于Ctrl+C)
    "\x03\n",
    # 扔掉本地分支修改
    "git checkout .\n",
    # 切换到master分支
    "git checkout master\n",
    # 删除dev分支
    "git branch -D dev\n",
    # 拉取线上dev分支到本地为dev分支
    "git fetch -f origin dev:dev\n",
    # 切换到dev分支
    "git checkout dev\n",
)

# 连接服务器执行代命令
try:
    # 实例化一个SSH连接对象
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    # 读取私钥(注意私钥路径)
    ssh.connect('服务器IP', port=22, username='root', key_filename='C:/Users/用户/.ssh/id_rsa')
    # 援引shell
    chan = ssh.invoke_shell()
    # 遍历命令流
    for com in command:
        # 发送执行命令
        chan.send(com)
        # 暂停一段时间,等待命令响应
        time.sleep(5)
        # 非必须,接受返回消息
        print(chan.recv(20000).decode())
    # 关闭SSH连接对象
    ssh.close()
except Exception as e:
    print(e)

SSH链接

SSH 为 Secure Shell 的缩写,由 IETF 的网络小组(Network Working Group)所制定;SSH 为建立在应用层基础上的安全协议。SSH 是较可靠,专为远程登录会话和其他网络服务提供安全性的协议。利用 SSH 协议可以有效防止远程管理过程中的信息泄露问题。

以往我们访问数据库的方式都是直连的,也就是直接访问数据库。**但在项目当中,处于安全的考虑,很多情况下线上的数据库都不允许直接访问,大多是通过SSH链接访问。**例如:数据库在A机器,但只能通过B机器来访问,C机器想要访问A机器的数据库,就只能链接B机器再链接A机器访问数据库了。在Python中可以使用 mysqldb 模块通过ssh隧道来链接线上的MySQL数据库:

python
import pymysql
from sshtunnel import SSHTunnelForwarder

class Spider_Mail(object):
    def __init__(self):
        self.server = None
        self.db = None
        self.cursor = None

    def connect_mysql(self):
        try:
            # 通过SSH链接到可访问数据库的主机上
            self.server = SSHTunnelForwarder(
                ('0.0.0.0', 22),                        # 注释:SSH主机的IP地址和开放端口
                ssh_username="username",                # 注释:SSH的链接的用户名
                ssh_password="password",                # 注释:SSH的链接的密码
                remote_bind_address=('0.0.0.0', 3306))  # 注释:数据库所在的IP和端口
            self.server.start()
            # 选择要连接的数据库
            self.db = pymysql.connect(host="127.0.0.1",
                                 port=3306,             # 注释:数据库端口
                                 user="user",           # 注释:数据库账号
                                 passwd="passwd",       # 注释:数据库密码
                                 db="database")         # 注释:访问特定的数据库
            self.cursor = self.db.cursor()
            print('-'*40+'连接线上数据库成功'+'-'*40)
        except Exception as e:
            print(f'连接线上数据库出错:{e}')

    def main(self):
        self.connect_mysql()
        if self.server and self.db and self.cursor:
            try:
                '''
                这里执行数据库里面的一些操作
                '''
                # 关闭连接
                self.db.close()
                self.server.close()
                print('-' * 40 + '关闭线上数据库连接' + '-' * 40)
            except Exception as e:
                print(f'程序出错:{e}')

假如我们的MySQL数据库在服务器上,且不能通过IP直连,就需要通过SSH链接来连接:

python
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sshtunnel import SSHTunnelForwarder

try:
    ssh_mysql_server = SSHTunnelForwarder(
            ('服务器IP地址', 22),                            # 注释:服务器IP地址
            ssh_username="root",                           # 注释:登录服务器的用户名
            ssh_password="密码",                            # 注释:登录服务器用户名的密码
            remote_bind_address=('172.19.216.186', 3306))  # 注释:再跳转到内网186服务器上的3306端口的MySQL
    ssh_mysql_server.start()
    temp_new_engine = create_engine(
            "mysql+pymysql://root:[email protected]:3306/数据库名",
            max_overflow=10,
            pool_size=10,     # 注释:连接池大小
            pool_timeout=60,  # 注释:池中没有线程最多等待的时间,否则报错
            pool_recycle=7200
        )
    SSH_MYSQL_SESSION = sessionmaker(bind=temp_new_engine)
except Exception as e:
    print(e)

系统信息

psutil是一个跨平台库,能够轻松实现获取系统运行的进程和系统利用率(包括CPU、内存、磁盘、网络等)信息,主要应用于系统监控,分析和限制系统资源及进程的管理。

CPU信息

python
# 查看cpu所有信息
psutil.cpu_times()
# 显示cpu所有逻辑信息
psutil.cpu_times(percpu=True)
# 查看用户的cpu时间比
psutil.cpu_times().user
# 查看cpu逻辑个数
psutil.cpu_count()
# 查看cpu物理个数
psutil.cpu_count(logical=False)

内存信息

python
# 系统总计内存
psutil.virtual_memory().total
# 系统已经使用内存
psutil.virtual_memory().used
# 系统空闲内存
psutil.virtual_memory().free
# 获取swap内存信息
psutil.virtual_memory().swap_memory()

磁盘信息

python
# 磁盘利用率使用
psutil.disk_usage()
# 获取磁盘的完整信息(包括)
psutil.disk_partitions()
# 获取硬盘IO总个数(包括read_count(读IO数),write_count(写IO数),read_bytes(IO写字节数),read_time(磁盘读时间),write_time(磁盘写时间))
psutil.disk_io_counters()
# 获取单个分区IO个数
psutil.disk_io_counters(perdisk=True)
# 获取分区表的参数
psutil.disk_usage('/')

网络信息

python
# 获取网络总IO信息(与磁盘IO信息类似,包括byes_sent(发送字节数),byte_recv=xxx(接受字节数),pack-ets_sent=xxx(发送字节数),pack-ets_recv=xxx(接收数据包数))
psutil.net_io_counters()
# 输出网络每个接口信息
psutil.net_io_counters(pernic=True)
# 获取当前系统用户登录信息
psutil.users()

进程信息

python
# 查看系统全部进程
psutil.pids()
# 查看单个进程
p = psutil.Process(进程号) 
p.name()            # 注释:进程名
p.exe()             # 注释:进程的bin路径
p.cwd()             # 注释:进程的工作目录绝对路径
p.status()          # 注释:进程状态
p.create_time()     # 注释:进程创建时间
p.uids()            # 注释:进程uid信息
p.gids()            # 注释:进程的gid信息
p.cpu_times()       # 注释:进程的cpu时间信息,包括user、system两个cpu信息
p.cpu_affinity()    # 注释:get进程cpu亲和度,如果要设置cpu亲和度,将cpu号作为参考就好
p.memory_percent()  # 注释:进程内存利用率
p.memory_info()     # 注释:进程内存rss,vms信息
p.io_counters()     # 注释:进程的IO信息,包括读写IO数字及参数
p.connectios()      # 注释:返回进程列表
p.num_threads()     # 注释:进程开启的线程数

时间信息

python
# 获取开机时间(以linux时间格式返回)
psutil.boot_time()
# 获取开机时间(以自然时间格式返回)
datetime.datetime.fromtimestamp(psutil.boot_time()).strftime("%Y-%m-%d %H: %M: %S")

定时任务

schedule是一个轻量级的定时任务调度的库,它可以完成每分钟、每小时、每天、每周几、特定日期的定时任务,十分方便我们执行一些轻量级的定时任务。

快速安装

使用下面命令安装Schedule任务调度的库:

pip install schedule

设置任务

使用Schedule设置定时任务:

python
def test_job():
    print('时间到了,开始执行')

# 每分钟在第17秒的时候执行任务
schedule.every().minute.at(':17').do(test_job)
# 每10分钟执行一次任务
schedule.every(10).minutes.do(test_job)
# 每小时执行一次任务
schedule.every().hour.do(test_job)
# 每天在10:30时间点执行一次任务
schedule.every().day.at('10:30').do(test_job)
# 每周一执行一次任务
schedule.every().monday.do(test_job)
# 每周一的9:30执行一次任务
schedule.every().monday.at('9:30').do(test_job)

# 在程序最后,设置一个循环,每分钟检查一下任务执行状态
while True:
    schedule.run_pending()
    time.sleep(60)