357 lines
14 KiB
Python
357 lines
14 KiB
Python
from selenium import webdriver
|
||
from selenium.webdriver.common.by import By
|
||
from selenium.webdriver.chrome.service import Service
|
||
from selenium.webdriver.chrome.options import Options
|
||
from selenium.webdriver.support.ui import WebDriverWait
|
||
from selenium.webdriver.support import expected_conditions as EC
|
||
import requests
|
||
from PIL import Image
|
||
import pytesseract
|
||
import io
|
||
import time
|
||
import re
|
||
import os
|
||
import tkinter as tk
|
||
import ftplib
|
||
|
||
#
|
||
# def ftp_upload_with_proxy(server, username, password, local_file_path, remote_path, proxy):
|
||
# """使用 pycurl 上传单个文件到 FTP 服务器"""
|
||
# buffer = BytesIO()
|
||
# c = pycurl.Curl()
|
||
#
|
||
# # 设置 FTP URL
|
||
# ftp_url = f'ftp://{server}/{remote_path}'
|
||
# c.setopt(c.URL, ftp_url)
|
||
#
|
||
# # 设置 FTP 认证
|
||
# c.setopt(c.USERPWD, f"{username}:{password}")
|
||
# c.setopt(c.READDATA, open(local_file_path, 'rb'))
|
||
# c.setopt(c.UPLOAD, 1)
|
||
# c.setopt(c.PREQUOTE, ['PASV'])
|
||
#
|
||
# # 设置代理
|
||
# c.setopt(c.PROXY, proxy)
|
||
#
|
||
# try:
|
||
# c.perform()
|
||
# print(f"文件 {local_file_path} 成功上传到 {remote_path}!")
|
||
# except pycurl.error as e:
|
||
# print(f"FTP 错误: {e}")
|
||
# finally:
|
||
# c.close()
|
||
|
||
# def upload_directory_to_ftp(server, username, password, local_directory, remote_directory, proxy):
|
||
# """上传整个目录及其子目录到FTP服务器"""
|
||
# # 确保远程目录存在
|
||
# try:
|
||
# # 此段代码需要检查并创建远程目录
|
||
# # pycurl 不直接提供创建目录的功能,需要手动实现
|
||
#
|
||
# # 上传文件和子目录
|
||
# for root, dirs, files in os.walk(local_directory):
|
||
# # 上传子文件夹
|
||
# for dir_name in dirs:
|
||
# remote_dir_path = os.path.join(remote_directory,
|
||
# os.path.relpath(os.path.join(root, dir_name), local_directory)).replace(
|
||
# "\\", "/")
|
||
# print(f"准备上传目录: {remote_dir_path}")
|
||
# # 假设已确认远程目录存在
|
||
#
|
||
# # 上传文件
|
||
# for filename in files:
|
||
# local_file_path = os.path.join(root, filename) # 本地文件的完整路径
|
||
# relative_path = os.path.relpath(local_file_path, local_directory) # 计算相对路径
|
||
# remote_file_path = os.path.join(remote_directory, relative_path).replace("\\", "/") # 计算远程文件路径
|
||
#
|
||
# # 上传文件
|
||
# ftp_upload_with_proxy(server, username, password, local_file_path, remote_file_path, proxy)
|
||
#
|
||
# except Exception as e:
|
||
# print(f"上传目录时发生错误: {e}")
|
||
|
||
def upload_file_to_ftp(server, username, password, file_path, remote_path):
|
||
try:
|
||
with ftplib.FTP(server) as ftp:
|
||
ftp.login(user=username, passwd=password)
|
||
with open(file_path, 'rb') as file:
|
||
ftp.storbinary(f'STOR {remote_path}', file)
|
||
print(f"文件 {file_path} 成功上传到 {remote_path}!")
|
||
except ftplib.all_errors as e:
|
||
print(f"FTP 错误: {e}")
|
||
|
||
def upload_directory_to_ftp(server, username, password, local_directory, remote_directory):
|
||
"""上传整个目录及其子目录到FTP服务器"""
|
||
try:
|
||
with ftplib.FTP(server) as ftp:
|
||
ftp.login(user=username, passwd=password)
|
||
|
||
# 确保远程目录存在
|
||
try:
|
||
ftp.cwd(remote_directory) # 改变到远程目录
|
||
except ftplib.error_perm:
|
||
ftp.mkd(remote_directory) # 如果远程目录不存在,则创建
|
||
ftp.cwd(remote_directory) # 再次进入远程目录
|
||
|
||
# 遍历本地目录树
|
||
for root, dirs, files in os.walk(local_directory):
|
||
# 上传子文件夹
|
||
for dir_name in dirs:
|
||
remote_dir_path = os.path.join(remote_directory, os.path.relpath(os.path.join(root, dir_name), local_directory)).replace("\\", "/")
|
||
try:
|
||
ftp.mkd(remote_dir_path) # 在FTP服务器上创建子目录
|
||
print(f"已创建远程目录: {remote_dir_path}")
|
||
except ftplib.error_perm:
|
||
pass # 如果目录已经存在则忽略
|
||
|
||
# 上传文件
|
||
for filename in files:
|
||
local_file_path = os.path.join(root, filename) # 本地文件的完整路径
|
||
relative_path = os.path.relpath(local_file_path, local_directory) # 计算相对路径
|
||
remote_file_path = os.path.join(remote_directory, relative_path).replace("\\", "/") # 计算远程文件路径
|
||
ftp_upload_with_proxy(server, username, password, local_file_path, remote_file_path, proxy_url) # 上传文件
|
||
|
||
except ftplib.all_errors as e:
|
||
print(f"上传目录时发生错误: {e}")
|
||
|
||
def launch_proxy():
|
||
print("开启代理...")
|
||
cmd = "start ./Shadowsocks-4.4.1.0/Shadowsocks.exe"
|
||
os.system('chcp 65001')
|
||
os.system(cmd)
|
||
|
||
def close_proxy():
|
||
print("杀死代理进程")
|
||
cmd = "taskkill /f /im Shadowsocks.exe"
|
||
os.system('chcp 65001')
|
||
os.system(cmd)
|
||
cmd = """
|
||
reg add "HKCU\\Software\\Microsoft\\Windows\\CurrentVersion\\Internet Settings" /v ProxyEnable /t REG_DWORD /d 0 /f
|
||
"""
|
||
print(cmd)
|
||
os.system(cmd)
|
||
|
||
|
||
def shot(chromedriver_path ,chrome_path):
|
||
|
||
options = Options()
|
||
options.add_experimental_option("detach", True)
|
||
options.binary_location = chrome_path
|
||
options.add_argument("--no-sandbox")
|
||
# 初始化 WebDriver
|
||
service = Service(chromedriver_path)
|
||
driver = webdriver.Chrome(service=service, options=options)
|
||
driver.get("http://125.64.9.222:8022/login_exam.php")
|
||
|
||
try:
|
||
|
||
launch_proxy()
|
||
time.sleep(5)
|
||
|
||
exam_id = name_entry.get()
|
||
student_id = id_entry.get()
|
||
password = password_entry.get()
|
||
|
||
try:
|
||
# 创建文件夹
|
||
os.mkdir(folder_path) # 如果目录已存在,则会抛出OSError
|
||
print(f"文件夹 '{folder_path}' 创建成功!")
|
||
|
||
except FileExistsError:
|
||
print(f"文件夹 '{folder_path}' 已存在!")
|
||
except Exception as e:
|
||
print(f"创建文件夹时发生错误: {e}")
|
||
|
||
# 填写 exam_id, student_id 和 password
|
||
driver.find_element(By.ID, "exam_id").send_keys(exam_id) # exam_id
|
||
driver.find_element(By.ID, "student_id").send_keys(student_id) # student_id
|
||
driver.find_element(By.ID, "password").send_keys(password) # password
|
||
|
||
# 获取验证码图片
|
||
captcha_image = driver.find_element(By.CSS_SELECTOR, "img[src='./lib/image_code_1.php']")
|
||
|
||
# 截取验证码图片
|
||
captcha = captcha_image.screenshot_as_png
|
||
|
||
# 打开验证码图片
|
||
captcha = Image.open(io.BytesIO(captcha))
|
||
captcha.save("captcha.png") # 可选,保存图片以供调试
|
||
captcha_text = pytesseract.image_to_string(captcha, config="--psm 6")
|
||
print(f"识别到的验证码: {captcha_text}")
|
||
|
||
# 填写验证码
|
||
driver.find_element(By.ID, "imageCode").send_keys(captcha_text.strip())
|
||
|
||
# 提交表单
|
||
driver.find_element(By.ID, "ok").click() # 替换为实际的提交按钮 ID
|
||
|
||
# 获取页面 Cookies
|
||
cookies = driver.get_cookies()
|
||
session_cookies = {cookie['name']: cookie['value'] for cookie in cookies}
|
||
|
||
# 用 requests 请求目标 URL
|
||
url = "http://125.64.9.222:8022/paper/student_server_info.php"
|
||
response = requests.get(url, cookies=session_cookies)
|
||
response.raise_for_status() # 确保请求成功
|
||
print(response.text) # 打印响应内容
|
||
|
||
matches = [match.strip() for match in re.findall(r"(?<=:).*?(?=<br>)", response.text)]
|
||
print(matches)
|
||
|
||
|
||
with open("info.txt", "w", encoding="utf-8") as f:
|
||
for match in matches:
|
||
f.write(match + "\n") # 保存结果到文件
|
||
|
||
# command = "start result.txt"
|
||
# subprocess.Popen(command, shell=True) # 打开结果文件
|
||
# 等待页面加载或处理结果
|
||
# WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.ID, "result"))) # 替换为实际的结果 ID
|
||
# print("登录成功!")
|
||
|
||
exam_config = f"""
|
||
<?php
|
||
$link=mysqli_connect('localhost','{matches[2]}','{matches[3]}','{matches[5]}');
|
||
mysqli_query($link,'set names utf8');
|
||
|
||
// 设置默认时区
|
||
date_default_timezone_set("Asia/Shanghai");
|
||
"""
|
||
|
||
with open(folder_path + "db_config.php", "w", encoding="utf-8") as f:
|
||
f.write(exam_config) # 保存 db_config.php 文件
|
||
|
||
ftp_server = matches[0]
|
||
ftp_username = matches[2]
|
||
ftp_password = matches[3]
|
||
# 调用上传函数
|
||
upload_directory_to_ftp(ftp_server, ftp_username, ftp_password, local_file_path, remote_file_path, proxy_url)
|
||
|
||
# url = f"http://{matches[0]}:{matches[1]}/exam_setup.php"
|
||
# response = requests.get(url, cookies=session_cookies)
|
||
# print(response.raise_for_status())
|
||
# print(response.text)
|
||
time.sleep(5)
|
||
driver.get(f"http://{matches[0]}:{matches[1]}/setup.html")
|
||
|
||
# 等待页面加载完成
|
||
# WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.ID, "kw")))
|
||
# 输入搜索词
|
||
try:
|
||
button = WebDriverWait(driver, 10).until(
|
||
EC.element_to_be_clickable((By.XPATH, "//button[text()='create table']")) # 使用按钮的文本查找元素
|
||
)
|
||
button.click() # 点击按钮
|
||
except Exception as e:
|
||
print(f"发生错误: {e}")
|
||
finally:
|
||
result_content = ""
|
||
try:
|
||
result_element = WebDriverWait(driver, 100).until(
|
||
EC.presence_of_element_located((By.ID, "result"))
|
||
)
|
||
|
||
# 获取内容
|
||
result_content = result_element.text
|
||
print(f"ID为result的内容: {result_content}")
|
||
|
||
except Exception as e:
|
||
print(f"发生错误: {e}")
|
||
|
||
print("执行完毕:" + result_content)
|
||
|
||
|
||
driver.get("http://125.64.9.222:8022/paper/paper.php")
|
||
|
||
# 等待页面加载
|
||
wait = WebDriverWait(driver, 30)
|
||
for fillingQ_id in fillingQ_ids:
|
||
textarea = wait.until(EC.presence_of_element_located((By.ID, fillingQ_id)))
|
||
with open (fillingQ_path + f"/{fillingQ_id}.txt", "r", encoding="utf-8") as f:
|
||
textarea.send_keys(f.read()) # 填写题目
|
||
|
||
for save in fillingQ_save_ids:
|
||
driver.execute_script(f"{save}")
|
||
print(f"已保存 {save} 题目")
|
||
time.sleep(1)
|
||
|
||
tk.messagebox.showinfo("提示","执行成功!请手动进入网站点击调试")
|
||
except Exception as e:
|
||
print(f"发生错误: {e}")
|
||
tk.messagebox.showerror("错误", f"发生错误: {e}")
|
||
|
||
finally:
|
||
close_proxy()
|
||
driver.quit()
|
||
|
||
if __name__ == "__main__":
|
||
# 设置 Tesseract OCR 的路径
|
||
pytesseract.pytesseract.tesseract_cmd = r'.\Tesseract\tesseract.exe'
|
||
folder_path = "./20250103/"
|
||
chrome_path = "./chrome-win64/chrome.exe" # 替换为你的路径
|
||
chromedriver_path = "./chromedriver.exe" # 替换为你的 ChromeDriver 路径
|
||
|
||
exam_id = "20250103" # 填写题号
|
||
student_id = "" # 填写学号
|
||
password = "" # 填写密码
|
||
|
||
ftp_server = 'ftp.example.com' # 替换为FTP服务器地址
|
||
ftp_username = 'your_username' # 替换为FTP用户名
|
||
ftp_password = 'your_password' # 替换为FTP密码
|
||
proxy_url = "http://127.0.0.1:22222" #代理地址
|
||
local_file_path = folder_path # 替换为要上传的本地文件路径
|
||
fillingQ_path = "./blankFillingQ"
|
||
remote_file_path = '/' # 替换为远程服务器上保存的文件路径
|
||
|
||
fillingQ_ids = ["textarea_student_answer_01", "textarea_student_answer_01_2_e","textarea_student_answer_01_3_e","textarea_student_answer_02","textarea_student_answer_521_1",
|
||
"textarea_student_answer_60_2_e"]
|
||
fillingQ_save_ids = ["save('01')", "save('01_2_e')","save('01_3_e')","save('02')","save('521_1')"]
|
||
|
||
root = tk.Tk()
|
||
root.title("一键执行")
|
||
|
||
# 设置窗口大小
|
||
window_width = 300
|
||
window_height = 200
|
||
root.geometry(f"{window_width}x{window_height}")
|
||
|
||
# 获取屏幕宽度和高度
|
||
screen_width = root.winfo_screenwidth()
|
||
screen_height = root.winfo_screenheight()
|
||
|
||
# 计算主窗口的位置
|
||
x = (screen_width // 2) - (window_width // 2)
|
||
y = (screen_height // 2) - (window_height // 2)
|
||
|
||
# 设置窗口位置
|
||
root.geometry(f"{window_width}x{window_height}+{x}+{y}")
|
||
|
||
# 标签
|
||
label = tk.Label(root, text="题号")
|
||
label.place(x=50, y=20)
|
||
name_entry = tk.Entry(root)
|
||
name_entry.pack(pady=10)
|
||
name_entry.place(x=100, y=20)
|
||
name_entry.insert(0, exam_id)
|
||
name_entry.config(state="disabled")
|
||
|
||
label2 = tk.Label(root, text="学号")
|
||
label2.place(x=50, y=60)
|
||
id_entry = tk.Entry(root)
|
||
id_entry.pack(pady=10)
|
||
id_entry.place(x=100, y=60)
|
||
label3 = tk.Label(root, text="密码")
|
||
label3.place(x=50, y=100)
|
||
password_entry = tk.Entry(root, show="*")
|
||
password_entry.pack(pady=10)
|
||
password_entry.place(x=100, y=100)
|
||
|
||
# 按钮
|
||
submit_button = tk.Button(root, text="执行", command=lambda: shot(chromedriver_path, chrome_path))
|
||
submit_button.pack(pady=10)
|
||
submit_button.pack(side=tk.BOTTOM)
|
||
submit_button.config(width=10, height=1)
|
||
|
||
|
||
|
||
root.mainloop() |