Следующий код отлично работает, когда я использую этот API с помощью пользовательского интерфейса Swagger. Однако он не работает, когда к нему обращаются из интерфейса реагирования, показывая код состояния 422 необрабатываемый объект. Ниже приведена моя конечная точка FastAPI:
@post.post('/post')
async def create_post(post_text: str, image: bytes = File(None), token: str = Depends(oauth2_scheme), db: Session = Depends(database.get_db)):
user_info = await services.get_user_info(token)
username = user_info["username"]
print(username)
image_url = None
if image:
# Generate a unique filename for the image
image_filename = f"{username}_{uuid.uuid4().hex}.jpg"
# print(image_filename)
# Save the image to bytes and send to MinIO bucket
# image_bytes = await image.read()
image_bytes = base64.b64decode(image.split(",")[1])
minio_client.put_object(
"minilinkedindistributed",
image_filename,
io.BytesIO(image_bytes),
length=len(image_bytes),
content_type="image/jpeg"
)
# Construct the image URL based on MinIO server URL and bucket name
image_url = f"http://127.0.0.1:9000/minilinkedindistributed/{image_filename}"
print(image_url)
elif image is None or image.filename == '':
raise HTTPException(status_code=400, detail='Invalid or empty image file')
# Create the post
new_post = services.make_post(db, username, post_text, image_url)
headers = {"Authorization": f"Bearer {token}"}
# Get all users (except the one who posted)
async with httpx.AsyncClient() as client:
response = await client.get("http://127.0.0.1:8000/api/v1/all_users_except_poster", headers=headers)
if response.status_code == 200:
all_users_except_poster = response.json()
else:
raise HTTPException(status_code=response.status_code, detail="Failed to fetch users from user service")
# Create a notification for each user
for user_to_notify in all_users_except_poster:
notification_data = {
'notification_text': f"{username} made a new post...",
'pid' : new_post.id,
'username' : user_to_notify["username"],
'notification_datetime' : datetime.utcnow().isoformat(),
'is_read' : False
}
async with httpx.AsyncClient() as client:
response = await client.post("http://127.0.0.1:8002/api/v1/notification", json=notification_data, headers=headers)
if response.status_code != 200:
raise HTTPException(status_code=response.status_code, detail="Failed to send notification")
return{"message" : new_post}
Следующий код отлично работает, когда я использую этот API с помощью пользовательского интерфейса Swagger. Однако он не работает, когда к нему обращаются из интерфейса реагирования, показывая код состояния 422 необрабатываемый объект. Ниже приведена моя конечная точка FastAPI: [code]@post.post('/post') async def create_post(post_text: str, image: bytes = File(None), token: str = Depends(oauth2_scheme), db: Session = Depends(database.get_db)): user_info = await services.get_user_info(token) username = user_info["username"] print(username) image_url = None if image: # Generate a unique filename for the image image_filename = f"{username}_{uuid.uuid4().hex}.jpg" # print(image_filename)
# Save the image to bytes and send to MinIO bucket # image_bytes = await image.read() image_bytes = base64.b64decode(image.split(",")[1])
# Construct the image URL based on MinIO server URL and bucket name image_url = f"http://127.0.0.1:9000/minilinkedindistributed/{image_filename}" print(image_url) elif image is None or image.filename == '': raise HTTPException(status_code=400, detail='Invalid or empty image file') # Create the post new_post = services.make_post(db, username, post_text, image_url)
headers = {"Authorization": f"Bearer {token}"}
# Get all users (except the one who posted) async with httpx.AsyncClient() as client: response = await client.get("http://127.0.0.1:8000/api/v1/all_users_except_poster", headers=headers)
if response.status_code == 200: all_users_except_poster = response.json() else: raise HTTPException(status_code=response.status_code, detail="Failed to fetch users from user service")
# Create a notification for each user for user_to_notify in all_users_except_poster: notification_data = { 'notification_text': f"{username} made a new post...", 'pid' : new_post.id, 'username' : user_to_notify["username"], 'notification_datetime' : datetime.utcnow().isoformat(), 'is_read' : False }
async with httpx.AsyncClient() as client: response = await client.post("http://127.0.0.1:8002/api/v1/notification", json=notification_data, headers=headers)
if response.status_code != 200: raise HTTPException(status_code=response.status_code, detail="Failed to send notification")
// Make a POST request to create a new post try { const response = await axios.post(`${API_BASE_URL}/post`, formData, { headers: { Authorization: `Bearer ${access_token}`, 'Content-Type': 'multipart/form-data', // Set the correct content type for the form data }, });
// Refresh the posts after successful submission fetchPosts(access_token);