import uncurl import time import requests import schedule import threading def execute_curl(curl_command): # Convert cURL command to Python requests code request_code = uncurl.parse(curl_command) request_code = f"response = {request_code}" # Ensure response is assigned to a variable in the namespace # Prepare a namespace to execute the generated code namespace = { 'requests': requests, 'response': None } # Start timing start_time = time.time() # Execute the generated request code in the given namespace exec(request_code, namespace) # End timing end_time = time.time() # round to 4 decimal request_time = round(end_time - start_time, 4) # Get the response from the namespace response = namespace['response'] # Collect response details response_details = { 'request_time': request_time, 'status_code': response.status_code, 'content': response.content.decode('utf-8') # Assuming the content is in UTF-8 } return response_details # Dictionary to keep track of scheduled jobs scheduled_jobs = {} def schedule_curl_command(curl_command, interval_seconds, job_name): def run_curl(): response_details = execute_curl(curl_command) print(response_details) # Schedule the run_curl function to run every interval_seconds job = schedule.every(interval_seconds).seconds.do(run_curl) # Store the job in the dictionary scheduled_jobs[job_name] = job def stop_specific_curl_task(job_name): if job_name in scheduled_jobs: schedule.cancel_job(scheduled_jobs[job_name]) del scheduled_jobs[job_name] def run_schedule(): global stop_thread while not stop_thread: schedule.run_pending() time.sleep(1) def print_scheduled_jobs(): jobs = schedule.get_jobs() print(f"Currently scheduled jobs: {len(jobs)}") for job in jobs: print(f"Job: {job}") def print_running_threads(): threads = threading.enumerate() print(f"Currently running threads: {len(threads)}") for thread in threads: print(f"Thread name: {thread.name}") curl_command_1 = '' # Schedule the cURL command to run every 60 seconds with job name "task1" schedule_curl_command(curl_command_1, 60, "task1") # Start the schedule runner in a separate thread schedule_thread = threading.Thread(target=run_schedule, name="schedule_thread") schedule_thread.start() # Print the scheduled jobs time.sleep(5) # Wait for a few seconds to ensure jobs are scheduled print_scheduled_jobs() # Run for some time and then stop (for demonstration purposes) time.sleep(180) # Run for 180 seconds stop_specific_curl_task("task1") # Print the scheduled jobs after stopping the task print_scheduled_jobs() # Stop the scheduler thread stop_thread = True schedule_thread.join()