feat: Add models and routes for task management

This commit is contained in:
Ching 2024-06-27 18:07:24 +08:00
parent c4e589b8bf
commit bfe78b78f2
4 changed files with 196 additions and 102 deletions

59
app.py Normal file
View File

@ -0,0 +1,59 @@
from flask import Flask, request, jsonify
from models import Task
from scheduler import start_scheduler, stop_scheduler, stop_specific_task, scheduled_jobs
app = Flask(__name__)
@app.route('/add_task', methods=['POST'])
def add_task():
data = request.json
task, created = Task.get_or_create(name=data['name'], defaults={
'curl': data['curl'],
'interval': data['interval']
})
if not created:
return jsonify({'error': 'Task with this name already exists'}), 400
return jsonify({'message': 'Task added successfully'})
@app.route('/start_task', methods=['POST'])
def start_task():
data = request.json
task = Task.get(Task.name == data['name'])
task.running = True
task.save()
return jsonify({'message': 'Task started successfully'})
@app.route('/stop_task', methods=['POST'])
def stop_task():
data = request.json
stop_specific_task(data['name'])
return jsonify({'message': 'Task stopped successfully'})
@app.route('/get_tasks', methods=['GET'])
def get_tasks():
tasks = Task.select()
task_list = []
for task in tasks:
task_list.append({
'name': task.name,
'curl': task.curl,
'interval': task.interval,
'running': task.running,
'result': task.result
})
return jsonify(task_list)
@app.route('/stop_scheduler', methods=['GET'])
def stop():
stop_scheduler()
return jsonify({'message': 'Scheduler stopped successfully'})
# stop scheduler when the app is stopped or killed
@app.teardown_appcontext
def shutdown_scheduler(exception=None):
stop_scheduler()
if __name__ == '__main__':
start_scheduler()
app.run(debug=True)

View File

@ -1,102 +0,0 @@
import uncurl
import time
import requests
import schedule
import threading
def execute_curl(curl_command):
# Convert cURL command to Python requests code
request_code = uncurl.parse(curl_command)
request_code = f"response = {request_code}"
# Ensure response is assigned to a variable in the namespace
# Prepare a namespace to execute the generated code
namespace = {
'requests': requests,
'response': None
}
# Start timing
start_time = time.time()
# Execute the generated request code in the given namespace
exec(request_code, namespace)
# End timing
end_time = time.time()
# round to 4 decimal
request_time = round(end_time - start_time, 4)
# Get the response from the namespace
response = namespace['response']
# Collect response details
response_details = {
'request_time': request_time,
'status_code': response.status_code,
'content': response.content.decode('utf-8') # Assuming the content is in UTF-8
}
return response_details
# Dictionary to keep track of scheduled jobs
scheduled_jobs = {}
def schedule_curl_command(curl_command, interval_seconds, job_name):
def run_curl():
response_details = execute_curl(curl_command)
print(response_details)
# Schedule the run_curl function to run every interval_seconds
job = schedule.every(interval_seconds).seconds.do(run_curl)
# Store the job in the dictionary
scheduled_jobs[job_name] = job
def stop_specific_curl_task(job_name):
if job_name in scheduled_jobs:
schedule.cancel_job(scheduled_jobs[job_name])
del scheduled_jobs[job_name]
def run_schedule():
global stop_thread
while not stop_thread:
schedule.run_pending()
time.sleep(1)
def print_scheduled_jobs():
jobs = schedule.get_jobs()
print(f"Currently scheduled jobs: {len(jobs)}")
for job in jobs:
print(f"Job: {job}")
def print_running_threads():
threads = threading.enumerate()
print(f"Currently running threads: {len(threads)}")
for thread in threads:
print(f"Thread name: {thread.name}")
curl_command_1 = ''
# Schedule the cURL command to run every 60 seconds with job name "task1"
schedule_curl_command(curl_command_1, 60, "task1")
# Start the schedule runner in a separate thread
schedule_thread = threading.Thread(target=run_schedule, name="schedule_thread")
schedule_thread.start()
# Print the scheduled jobs
time.sleep(5) # Wait for a few seconds to ensure jobs are scheduled
print_scheduled_jobs()
# Run for some time and then stop (for demonstration purposes)
time.sleep(180) # Run for 180 seconds
stop_specific_curl_task("task1")
# Print the scheduled jobs after stopping the task
print_scheduled_jobs()
# Stop the scheduler thread
stop_thread = True
schedule_thread.join()

16
models.py Normal file
View File

@ -0,0 +1,16 @@
from peewee import SqliteDatabase, Model, CharField, IntegerField, BooleanField, TextField
db = SqliteDatabase('tasks.db')
class Task(Model):
name = CharField(unique=True)
curl = TextField()
interval = IntegerField()
running = BooleanField(default=False)
result = TextField(default='')
class Meta:
database = db
db.connect()
db.create_tables([Task])

121
scheduler.py Normal file
View File

@ -0,0 +1,121 @@
import uncurl
import time
import requests
import schedule
import threading
from models import Task
def execute_curl(curl_command):
# Convert cURL command to Python requests code
request_code = uncurl.parse(curl_command)
request_code = f"response = {request_code}"
# Ensure response is assigned to a variable in the namespace
# Prepare a namespace to execute the generated code
namespace = {
'requests': requests,
'response': None
}
# Start timing
start_time = time.time()
# Execute the generated request code in the given namespace
exec(request_code, namespace)
# End timing
end_time = time.time()
# round to 4 decimal
request_time = round(end_time - start_time, 4)
# Get the response from the namespace
response = namespace['response']
# Collect response details
response_details = {
'request_time': request_time,
'status_code': response.status_code,
'content': response.content.decode('utf-8') # Assuming the content is in UTF-8
}
return response_details
# Dictionary to keep track of scheduled jobs
scheduled_jobs = {}
stop_thread = False
def run_schedule():
global stop_thread
while not stop_thread:
for task in Task.select().where(Task.running == True):
if task.name in scheduled_jobs:
continue
job = threading.Thread(target=run_task, args=(task,))
job.start()
scheduled_jobs[task.name] = job
time.sleep(1)
def run_task(task):
while task.running:
result = execute_curl(task.curl)
task.result = result['content']
task.save()
time.sleep(task.interval)
def start_scheduler():
global stop_thread
stop_thread = False
scheduler_thread = threading.Thread(target=run_schedule)
scheduler_thread.start()
def stop_scheduler():
global stop_thread
stop_thread = True
for job in scheduled_jobs.values():
job.join()
scheduled_jobs.clear()
def stop_specific_task(task_name):
task = Task.get(Task.name == task_name)
task.running = False
task.save()
if task_name in scheduled_jobs:
scheduled_jobs[task_name].join()
del scheduled_jobs[task_name]
def print_scheduled_jobs():
jobs = schedule.get_jobs()
print(f"Currently scheduled jobs: {len(jobs)}")
for job in jobs:
print(f"Job: {job}")
def print_running_threads():
threads = threading.enumerate()
print(f"Currently running threads: {len(threads)}")
for thread in threads:
print(f"Thread name: {thread.name}")
# curl_command_1 = ''
# # Schedule the cURL command to run every 60 seconds with job name "task1"
# schedule_curl_command(curl_command_1, 60, "task1")
# # Start the schedule runner in a separate thread
# schedule_thread = threading.Thread(target=run_schedule, name="schedule_thread")
# schedule_thread.start()
# # Print the scheduled jobs
# time.sleep(5) # Wait for a few seconds to ensure jobs are scheduled
# print_scheduled_jobs()
# # Run for some time and then stop (for demonstration purposes)
# time.sleep(180) # Run for 180 seconds
# stop_specific_curl_task("task1")
# # Print the scheduled jobs after stopping the task
# print_scheduled_jobs()
# # Stop the scheduler thread
# stop_thread = True
# schedule_thread.join()