feat(http): started http server and started testing

This commit is contained in:
darkicewolf50 2024-11-24 15:07:36 -07:00
parent c986b2b272
commit acf1991484
6 changed files with 77 additions and 78 deletions

View File

@ -17,10 +17,10 @@ def getSchedulePackager():
``Contact``: darkicewolf50@gmail.ocm
"""
res = {
"isBase64ENcoded": "false",
"statusCode": 200,
"body": ymlschedule
}
return json.dumps(res)
return {
"statusCode": 200,
"isBase64ENcoded": "false",
"body": json.dumps(ymlschedule)
}

110
main.py
View File

@ -10,15 +10,15 @@ class RequestHandler(BaseHTTPRequestHandler):
def do_GET(self):
"""Handle GET requests and route based on path"""
if self.path == '/appointments':
self._send_response(doNothing())
if self.path == '/getAppointments':
self._send_response(getSchedulePackager())
elif self.path == '/interview_data':
self.wfile.write(doNothing())
self._send_response(doNothing()) # If you want to return an empty response
else:
self._send_error()
self._send_error(404, "Not Found")
def do_POST(self):
"""Handle POST requests to '/appointments' or '/interview_data'"""
"""Handle POST requests to '/selectAppointment' or '/'"""
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
@ -26,74 +26,65 @@ class RequestHandler(BaseHTTPRequestHandler):
try:
json_data = json.loads(post_data.decode('utf-8'))
except json.JSONDecodeError:
self._send_response(400, "Invalid Request")
self._send_error(400, "Invalid Request")
return
if self.path == '/appointments':
self._send_response(SelectAppointment(json_data))
elif self.path == '/nothing':
self._send_response(doNothing())
if self.path == '/selectAppointment':
self._send_response(SelectAppointment(json_data)) # Use SelectAppointment directly
elif self.path == '/':
self._send_response(doNothing()) # Empty response for the root route
else:
self._send_error(404)
self._send_error(404, "Not Found")
# def do_DELETE(self):
# """Handle DELETE requests to '/appointments' or '/interview_data'"""
# content_length = int(self.headers['Content-Length'])
# delete_data = self.rfile.read(content_length)
def _send_response(self, res):
"""Send the response body with the given status code"""
if isinstance(res, dict):
# Ensure res has all the required keys
body = res.get("body", "") # Default to empty string if no body is provided
baseEncoded = res.get("isBase64ENcoded", "false") # Default to "false" if not provided
status_code = res.get("statusCode", 200) # Default to 200 if no statusCode is provided
# # Try to parse JSON data from the DELETE request body
# try:
# json_data = json.loads(delete_data.decode('utf-8'))
# except json.JSONDecodeError:
# self.send_error(400, "Invalid JSON")
# return
# Convert body to string if it's not already
if isinstance(body, dict):
body = json.dumps(body)
# if self.path == '/appointments':
# self.handle_appointments_delete(json_data)
# elif self.path == '/interview_data':
# self.handle_interview_data_delete(json_data)
# else:
# self.send_error(404, "Not Found")
# If body is base64 encoded, we would handle that differently, but currently we treat all as 'false'
def _send_response(self, response):
"""Send the response body, status code, and headers directly"""
status_code = response['statusCode']
headers = response['headers']
body = json.dumps(response['body']).encode('utf-8') # Encoding happens here
response = {
"statusCode": status_code,
"isBase64Encoded": baseEncoded,
"headers": {
"Content-Type": "application/json", # Specify content type
"X-IsBase64Encoded": baseEncoded # Indicating if the body is base64 encoded
},
"body": body
}
# Set the response status code
# Send the status code and headers
self.send_response(status_code)
# Set the response headers
for header, value in headers.items():
self.send_header(header, value)
# End the headers
self.send_header("Content-Type", "application/json")
self.end_headers()
# Send the response body
self.wfile.write(body)
def _send_error(self):
{"statusCode": 404,
"isBase64ENcoded": "false",
"headers": {
"Content-Type": "application/json", # Specify content type
"X-IsBase64Encoded": "false" # Indicating that the body is not base64 encoded
},
"body": "Invalid Response"}
self._send_response
# Write the JSON response body, ensuring it's encoded to utf-8
self.wfile.write(json.dumps(response).encode('utf-8'))
def _send_error(self, status_code=404, message="Not Found"):
"""Send error response with the same structure as normal responses"""
res = {
"statusCode": status_code,
"body": json.dumps({"message": message})
}
# Return the error response in the same format
self._send_response(res)
def doNothing():
"""Return an empty JSON response"""
return {
"statusCode": 200,
"isBase64ENcoded": "false",
"body": json.dumps({"message": ""})
}
return json.dumps({"statusCode": 200,
"isBase64ENcoded": "false",
"headers": {
"Content-Type": "application/json", # Specify content type
"X-IsBase64Encoded": "false" # Indicating that the body is not base64 encoded
},
"body": ""})
# Graceful shutdown handler
def signal_handler(sig, frame):
@ -114,7 +105,4 @@ if __name__ == "__main__":
# Start the server and listen for requests
httpd.serve_forever()
except KeyboardInterrupt:
# Server shutdown is handled in signal_handler
pass
print(getSchedulePackager())
print(SelectAppointment("10:00 am"))

View File

@ -17,20 +17,17 @@ def SelectAppointment (appointmentJson):
"""
status = mockWriteFunction(appointmentJson)
res = {"statusCode": 200,
"isBase64ENcoded": "false",
"headers": {
"Content-Type": "application/json", # Specify content type
"X-IsBase64Encoded": "false" # Indicating that the body is not base64 encoded
},
"body": ""}
if status:
res["body"] = {"Success": True}
resBody = {"Success": True}
else:
res["body"] = {"Success": False}
resBody = {"Success": False}
return json.dumps(res)
return {
"statusCode": 200,
"isBase64ENcoded": "false",
"body": json.dumps(resBody)
}
def mockWriteFunction(appTime):
return 0

14
testhttp.py Normal file
View File

@ -0,0 +1,14 @@
import json
import requests
import timeit
def BenchMark():
rawRes = requests.get("http://localhost:8080/getAppointments")
res = json.loads(rawRes.text)
# print(json.dumps(res, indent=1))
if __name__ == "__main__":
print(timeit.timeit(
stmt=BenchMark,
number=10))