import uncurl import time import requests import schedule import threading from models import Task def execute_curl(curl_command): # Convert cURL command to Python requests code request_code = uncurl.parse(curl_command) request_code = f"response = {request_code}" # Ensure response is assigned to a variable in the namespace # Prepare a namespace to execute the generated code namespace = { 'requests': requests, 'response': None } # Start timing start_time = time.time() # Execute the generated request code in the given namespace exec(request_code, namespace) # End timing end_time = time.time() # round to 4 decimal request_time = round(end_time - start_time, 4) # Get the response from the namespace response = namespace['response'] # Collect response details response_details = { 'request_time': request_time, 'status_code': response.status_code, 'content': response.content.decode('utf-8') # Assuming the content is in UTF-8 } return response_details # Dictionary to keep track of scheduled jobs scheduled_jobs = {} stop_thread = False def run_schedule(): global stop_thread while not stop_thread: for task in Task.select().where(Task.running == True): if task.name in scheduled_jobs: continue job = threading.Thread(target=run_task, args=(task,)) job.start() scheduled_jobs[task.name] = job time.sleep(1) def run_task(task): while task.running: result = execute_curl(task.curl) task.result = result['content'] task.save() time.sleep(task.interval) def start_scheduler(): global stop_thread stop_thread = False scheduler_thread = threading.Thread(target=run_schedule) scheduler_thread.start() def stop_scheduler(): global stop_thread stop_thread = True for job in scheduled_jobs.values(): job.join() scheduled_jobs.clear() def stop_specific_task(task_name): task = Task.get(Task.name == task_name) task.running = False task.save() if task_name in scheduled_jobs: scheduled_jobs[task_name].join() del scheduled_jobs[task_name] def print_scheduled_jobs(): jobs = schedule.get_jobs() print(f"Currently scheduled jobs: {len(jobs)}") for job in jobs: print(f"Job: {job}") def print_running_threads(): threads = threading.enumerate() print(f"Currently running threads: {len(threads)}") for thread in threads: print(f"Thread name: {thread.name}") # curl_command_1 = '' # # Schedule the cURL command to run every 60 seconds with job name "task1" # schedule_curl_command(curl_command_1, 60, "task1") # # Start the schedule runner in a separate thread # schedule_thread = threading.Thread(target=run_schedule, name="schedule_thread") # schedule_thread.start() # # Print the scheduled jobs # time.sleep(5) # Wait for a few seconds to ensure jobs are scheduled # print_scheduled_jobs() # # Run for some time and then stop (for demonstration purposes) # time.sleep(180) # Run for 180 seconds # stop_specific_curl_task("task1") # # Print the scheduled jobs after stopping the task # print_scheduled_jobs() # # Stop the scheduler thread # stop_thread = True # schedule_thread.join()