Правильно разорвать процесс при использовании pytest с многопроцессорной обработкой.Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Правильно разорвать процесс при использовании pytest с многопроцессорной обработкой.

Сообщение Anonymous »

Я работаю над пакетом, который включает в себя сервер с соответствующим клиентом.
Я думал, что смогу сэкономить много усилий, имитируя все взаимодействия, и просто написать несколько тестовых примеров, которые проверят обе стороны одновременно. время.
Поэтому я написал приспособление, которое дает экземпляр multiprocessing.Process, внутри которого работает сервер.
Когда тесты завершены, я прекратите процесс, используяprocess.terminate(). К сожалению, есть еще один «осиротевший» процесс, с которым я не могу справиться.
Как мне удалить другой процесс или как мне вообще не запускать его?
Если вы знаете Я полностью открыт для способа сделать это с помощью другой библиотеки, отличной от многопроцессорной.
Чтобы воспроизвести проблему, я использовал код из руководства «Как сделать простой-python-rest-server-and-» клиент для создания простой пары Rest-сервер/клиент.

Код: Выделить всё

import asyncio
import logging
import multiprocessing as mp

import pytest
import pytest_asyncio
import requests
from flask import Flask, jsonify

def server():
app = Flask(__name__)

tasks = [
{
"id": 1,
"title": "Buy groceries",
"description": "Milk, Cheese, Pizza, Fruit, Tylenol",
"done": False,
},
{
"id": 2,
"title": "Learn Python",
"description": "Need to find a good Python tutorial on the web",
"done": False,
},
]

@app.route("/todo/api/v1.0/tasks", methods=["GET"])
def get_tasks():
return jsonify({"tasks": tasks})

app.run()

class TestMultiprocessingTester:

@pytest.fixture(scope="class")
def server_process(self):
server_process = mp.Process(target=server)
server_process.start()
yield server_process
server_process.terminate()
if server_process.is_alive():
server_process.kill()
logging.getLogger("Process").warning("shut down server process")

@pytest.mark.asyncio
async def test_client(self, server_process: mp.Process):
await asyncio.sleep(10)
url = "http://localhost:5000/todo/api/v1.0/tasks"
response = requests.get(url)
assert response.status_code == 200
assert response.json()["tasks"][0]["done"] is False

Изображение


Подробнее здесь: https://stackoverflow.com/questions/790 ... processing
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»