У меня есть языковой сервер pygls, работающий как дочерний процесс из моей основной программы.
Код: Выделить всё
if not os.path.exists(fifo_in_name):
os.mkfifo(fifo_in_name)
if not os.path.exists(fifo_out_name):
os.mkfifo(fifo_out_name)
try:
fifo_in = os.open(fifo_in_name, os.O_RDONLY | os.O_NONBLOCK)
fifo_out = os.open(fifo_out_name, os.O_WRONLY | os.O_NONBLOCK)
except OSError as e:
logger.error(f"Failed to open FIFO: {e}")
raise e
process = await asyncio.create_subprocess_exec(
sys.executable, 'lsp/lsp_server.py', fifo_in_name, fifo_out_name,
)
Код: Выделить всё
@contextmanager
def open_fifo(name, mode):
fd = os.open(name, mode | os.O_NONBLOCK)
try:
if mode & os.O_RDONLY:
file_obj = os.fdopen(fd, 'rb', buffering=0)
elif mode & os.O_WRONLY:
file_obj = os.fdopen(fd, 'wb', buffering=0)
else:
raise ValueError("Unsupported mode")
yield file_obj
finally:
file_obj.close()
if __name__ == "__main__":
fifo_in = sys.argv[1]
fifo_out = sys.argv[2]
with open_fifo(fifo_in, os.O_RDONLY) as f, open_fifo(fifo_out, os.O_WRONLY) as r:
server.start_io(f, r)
print("Started LSP server")
Подробнее здесь: https://stackoverflow.com/questions/786 ... amed-pipes