From c4e589b8bf89dcd7c4b7c5d0bd0627d3ce0cfe14 Mon Sep 17 00:00:00 2001 From: Ching Date: Thu, 27 Jun 2024 17:17:46 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0=E4=BA=86=E4=B8=80?= =?UTF-8?q?=E4=B8=AA=E8=84=9A=E6=9C=AC=EF=BC=8C=E4=BD=BF=E7=94=A8=E6=88=B7?= =?UTF-8?q?=E5=8F=AF=E4=BB=A5=E6=89=A7=E8=A1=8CcURL=E5=91=BD=E4=BB=A4?= =?UTF-8?q?=E5=B9=B6=E6=8C=89=E6=8C=87=E5=AE=9A=E6=97=B6=E9=97=B4=E9=97=B4?= =?UTF-8?q?=E9=9A=94=E8=B0=83=E5=BA=A6=E8=BF=90=E8=A1=8C=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 脚本利用`uncurl`库将cURL命令转换为Python requests代码,并使用`schedule`库来调度命令的执行。 脚本提供了以下功能: - `execute_curl(curl_command)`: 接受一个cURL命令作为输入,将其转换为Python requests代码,执行代码,并返回响应详情。 - `schedule_curl_command(curl_command, interval_seconds, job_name)`: 使用`schedule`库按指定时间间隔调度cURL命令的执行。 - `stop_specific_curl_task(job_name)`: 停止执行特定的调度cURL任务。 - `run_schedule()`: 启动一个单独的线程来运行调度的cURL任务。 - `print_scheduled_jobs()`: 打印当前已调度的任务。 - `print_running_threads()`: 打印当前运行的线程。 提交还包括一个使用示例,其中一个cURL命令被调度为每60秒运行一次,任务名为“task1”。运行180秒后,任务停止并打印已调度的任务。 这个脚本提供了一种便捷的方式来自动执行cURL命令,并按特定时间间隔调度运行。 --- get_resp.py | 102 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 102 insertions(+) create mode 100644 get_resp.py diff --git a/get_resp.py b/get_resp.py new file mode 100644 index 0000000..cfe1f22 --- /dev/null +++ b/get_resp.py @@ -0,0 +1,102 @@ +import uncurl +import time +import requests +import schedule +import threading + +def execute_curl(curl_command): + # Convert cURL command to Python requests code + request_code = uncurl.parse(curl_command) + request_code = f"response = {request_code}" + # Ensure response is assigned to a variable in the namespace + + # Prepare a namespace to execute the generated code + namespace = { + 'requests': requests, + 'response': None + } + + # Start timing + start_time = time.time() + + # Execute the generated request code in the given namespace + exec(request_code, namespace) + + # End timing + end_time = time.time() + # round to 4 decimal + request_time = round(end_time - start_time, 4) + + # Get the response from the namespace + response = namespace['response'] + + # Collect response details + response_details = { + 'request_time': request_time, + 'status_code': response.status_code, + 'content': response.content.decode('utf-8') # Assuming the content is in UTF-8 + } + + return response_details + + + +# Dictionary to keep track of scheduled jobs +scheduled_jobs = {} + +def schedule_curl_command(curl_command, interval_seconds, job_name): + def run_curl(): + response_details = execute_curl(curl_command) + print(response_details) + + # Schedule the run_curl function to run every interval_seconds + job = schedule.every(interval_seconds).seconds.do(run_curl) + + # Store the job in the dictionary + scheduled_jobs[job_name] = job + +def stop_specific_curl_task(job_name): + if job_name in scheduled_jobs: + schedule.cancel_job(scheduled_jobs[job_name]) + del scheduled_jobs[job_name] + +def run_schedule(): + global stop_thread + while not stop_thread: + schedule.run_pending() + time.sleep(1) + +def print_scheduled_jobs(): + jobs = schedule.get_jobs() + print(f"Currently scheduled jobs: {len(jobs)}") + for job in jobs: + print(f"Job: {job}") + +def print_running_threads(): + threads = threading.enumerate() + print(f"Currently running threads: {len(threads)}") + for thread in threads: + print(f"Thread name: {thread.name}") + +curl_command_1 = '' +# Schedule the cURL command to run every 60 seconds with job name "task1" +schedule_curl_command(curl_command_1, 60, "task1") + +# Start the schedule runner in a separate thread +schedule_thread = threading.Thread(target=run_schedule, name="schedule_thread") +schedule_thread.start() + +# Print the scheduled jobs +time.sleep(5) # Wait for a few seconds to ensure jobs are scheduled +print_scheduled_jobs() + +# Run for some time and then stop (for demonstration purposes) +time.sleep(180) # Run for 180 seconds +stop_specific_curl_task("task1") + +# Print the scheduled jobs after stopping the task +print_scheduled_jobs() + +# Stop the scheduler thread +stop_thread = True +schedule_thread.join()