前言:
多喝水,不仅身体好,啥都好。而且不喝水会有惩罚。emmm,肚子有点饿,没东西吃还要写博客也是蛮痛苦。用python实现ssh功能主要运用了socket编程,主要是加强一下自己对最近有些所学知识的运用。原理就不多写了,直接上代码吧!
代码:
实现的功能:
实现多线程服务器,接收客户端的命令,在Windows系统下执行cmd命令实现简单的ssh功能,并记录日志
连接设置:
ADDRESS = "127.0.0.1"
PORT = 2222
客户端:
import socket
from ssh_settings import ADDRESS, PORT
client = socket.socket()
client.connect((ADDRESS, PORT))
while True:
show = ''
command = input(">>>").strip()
if not command:
continue
client.send(command.encode('utf-8'))
result_len = client.recv(1024) #接收内容长度
meg_box = []
for len in range(0,int(result_len),1024):
result = client.recv(1024)
meg_box.append(result.decode('utf-8'))
result = ''.join(meg_box) #拼接数据
print(result)
# result = client.recv(1024) #第二种方法接收数据
# print(result.decode('utf-8'))
# while True:
#
# print(len(result))
# if not result: #分段接收
# print(result.decode('utf-8'))
# result = client.recv(1024)
# else:
# break
client.close()
服务器:
from socket import socket
import os
from ssh_settings import ADDRESS, PORT
from threading import Thread
import logging
from logging_settings import LOGGING_DIC
logging.config.dictConfig(LOGGING_DIC)
logger = logging.getLogger("log1")
def server():
server = socket()
server.bind((ADDRESS, PORT))
server.listen(2)
conn, addr = server.accept()
while True:
print("等待指令...")
try:
logger.info(f"{addr} is connected.")
data = conn.recv(1024)
except ConnectionResetError as e: #记录连接异常
logger.error(f"Error: {e}", exc_info=True)
break
if not data:
print("client lost")
logger.warning("client lost")
else:
try :
print("receive:", data)
logger.info(f"server receive: {data.decode('utf-8')}")
cmd_ret = os.popen(data.decode('utf-8')).read()
cmd_ret = cmd_ret.encode('utf-8')
result = str(len(cmd_ret)).encode('utf-8') # 获得相应内容的长度
print(len(cmd_ret))
conn.send(result) # 先发送内容的长度给客户端
conn.sendall(cmd_ret)
print("send done")
logger.info("send done")
except Exception as e:
logger.error(f"Error: {e}",exc_info=True)
# python3 => bytes
# bytes -> str => decode
# str -> bytes => encode
while True:
Thread(target=server(), args=()).start()
日志格式设置:
import os
import logging.config
import time
def mkdir(path):
# 引入模块
# 去除首位空格
path = path.strip()
# 去除尾部 \ 符号
path = path.rstrip("\\")
# 判断路径是否存在
# 存在 True
# 不存在 False
isExists = os.path.exists(path)
# 判断结果
if not isExists:
# 如果不存在则创建目录
# 创建目录操作函数
os.makedirs(path)
return True
else:
# 如果目录存在则不创建,并提示目录已存在
return False
# 定义三种日志输出格式 开始
# 标准版 格式
standard_format = '[%(asctime)s][%(threadName)s:%(thread)d][task_id:%(name)s][%(filename)s:%(lineno)d]' \
'[%(levelname)s][%(message)s]' #其中name为getlogger指定的名字
# 简单版 格式
simple_format = '[%(levelname)s][%(asctime)s][%(filename)s:%(lineno)d]%(message)s'
# boss版格式(lowb版)
id_simple_format = '[%(levelname)s][%(asctime)s] %(message)s'
# 定义日志输出格式 结束
#
# 定义要创建的目录
mkpath = "./Logs"
# 调用函数
mkdir(mkpath)
rq = time.strftime('%Y%m%d%H%M', time.localtime(time.time()))
logfile_name = f'{mkpath}/{rq}.log' # log文件名
# log配置字典
LOGGING_DIC = {
'version': 1, # 版本
'disable_existing_loggers': False, # 可否重复使用之前的logger对象
'formatters': {
# standard, simple, boss_formatter => 自己定义的名字,后面要用
'standard': {
'format': standard_format
},
'simple': {
'format': simple_format
},
'boss_formatter': {
'format': id_simple_format
},
},
# 日志过滤器
'filters': {},
# file/stream/rotating
'handlers': {
#打印到终端的日志
# stream, file => 自定义的名字
'stream': {
'level': 'DEBUG',
'class': 'logging.StreamHandler', # 打印到屏幕
'formatter': 'simple'
},
#打印到文件的日志,收集info及以上的日志 文件句柄
'file': {
'level': 20,
'class': 'logging.handlers.RotatingFileHandler', # 保存到文件
'formatter': 'standard', # 标准
'filename': logfile_name, # 日志文件
# 'maxBytes': 300, # 日志大小 300 bit
'backupCount': 5, #轮转文件数
'encoding': 'utf-8', # 日志文件的编码,再也不用担心中文log乱码了
},
},
'loggers': {
# logging.getLogger(__name__)拿到的logger配置
'log1': {
'handlers': [ 'file'], # 这里把上面定义的两个handler都加上,即log数据既写入文件又打印到屏幕
'level': 'DEBUG', # 总级别
'propagate': True, # 向上(更高level的logger)传递
},
},
}
总结
这个小练习,让我最受用的就是我体会到了设置文件的好处。而且我们在编程的时候一定要考虑到日后维护的方便,所以一些编程的格式和模式都有讲究,不然以后苦的还是自己。