Что я пытаюсь сделать вот карточная игра (этап 10) с
1) графическим интерфейсом Kivy
2) связью asyncio сервер/клиент
Вот ссылка на репозиторий
-Я знаю, это грязно.
server.py:
Код: Выделить всё
import asyncio
import json
from common import Client
clients = {}
async def handle_client(reader, writer):
while True:
data = await reader.read(1024)
if not data:
break
message_json = data.decode()
message = json.loads(message_json)
type = None
try:
type = message["type"]
except:
print("There is no 'type' value in the message.")
break
if type == "register":
new_client = Client(reader,writer)
new_client.set_client_id(message["client_id"])
c_id = message["client_id"]
clients[c_id] = new_client
print(f"Received message: {message}")
writer.write(b"Message received: Registered Client {c_id}")
await writer.drain()
if type == "get_player":
print(f"GET PLAYER MESSAGE\n{message}")
async def main():
server = await asyncio.start_server(
handle_client, "127.0.0.1", 8888
)
async with server:
print("Server Listening")
await server.serve_forever()
if __name__ == "__main__":
asyncio.run(main())
Код: Выделить всё
import asyncio
import json
from common import Client
class GameClient(Client):
def __init__(self):
super().__init__()
self.make_client_id()
def add_client(self,client):
self.reader = client.reader
self.writer = client.writer
try:
self.client_id = client.client_id
except:
print("No client_id for GameClient")
def get_client_id(self):
return self.client_id
async def connect(self, addr = "127.0.0.1", port = 8888):
self.reader,self.writer = await asyncio.open_connection(addr, port)
print(f"Client created and connected:\n\t{self.client_id}")
async def send_message(self, message: dict):
message_json = json.dumps(message)
self.writer.write(message_json.encode())
await self.writer.drain()
async def register(self):
message_d = {
"type": "register",
"client_id": self.client_id
}
await self.send_message(self, message_d)
data = await self.reader.read(1024)
message_dict = data.decode()
# message_dict = json.loads(message_json)
print(f"Received: {message_dict}")
async def start_client(self):
await self.connect()
await self.register()
self.writer.close()
await self.writer.wait_closed()
async def run(self):
await self.start_client()
while True:
data = await self.reader.read(1024)
if not data:
print("No data: breaking...")
break
message_dict = data.decode()
# message_dict = json.loads(message_json)
print(f"Received: {message_dict}")
Код: Выделить всё
import asyncio
import asynckivy as ak
from kivy.app import App, async_runTouchApp
from kivy.base import runTouchApp
from kivy.uix.boxlayout import BoxLayout
from kivy.uix.gridlayout import GridLayout
from kivy.properties import ObjectProperty
from kivy.uix.screenmanager import Screen, ScreenManager
from phase10.client import GameClient
class GetPlayerScreen(Screen):
async def send_five(self):
message = {"type": "ready", "player": cl.client_id}
c = loop.create_task(cl.send_message(message))
class Loading(Screen):
pass
class PageMaster(ScreenManager):
pass
class PhaseTenApp(App):
def __init__(self,**kwargs):
super().__init__(**kwargs)
def build(self):
self.root = PageMaster()
self.root.add_widget(Loading(name = "loading"))
self.root.add_widget(GetPlayerScreen(name = "getplayer"))
self.root.current = "getplayer"
return self.root
cl = GameClient()
loop = asyncio.new_event_loop()
if __name__ == '__main__':
async def mainThread():
p10 = PhaseTenApp()
b = loop.create_task(p10.async_run())
a = loop.create_task(cl.run())
(done, pending) = await asyncio.wait({a}, return_when='FIRST_COMPLETED')
asyncio.run(mainThread())
Код: Выделить всё
:
BoxLayout:
orientation: "vertical"
AsyncImage:
source: "./assets/images/CardBack.png"
pos_hint: {"center_x":0.5}
:
conn_btn: conn_btn
name_input: name_input
BoxLayout:
orientation: "vertical"
Label:
text: "Name"
TextInput:
id: name_input
GridLayout:
cols: 3
Button:
id: conn_btn
text:"Go!"
Button:
text: "1"
on_press: root.send_five()
Button:
text: "2"
on_press: root.send_five()
Button:
text: "3"
on_press: root.send_five()
Button:
text: "4"
on_press: root.send_five()
Button:
text: "5"
on_press: root.send_five()
Я попробовал несколько подходов , я не уверен, что реализовал их правильно, поэтому, очевидно, я не смог успешно запустить приложение и не смог отправить сообщение на сервер после первоначального сообщения «регистрация».
У кого-нибудь есть идеи и, возможно, слова для преподать мне урок???
Заранее спасибо.
Я пробовал разделить клиент и графический интерфейс и запускать их из общего файла. В настоящее время работаю над тем, чтобы графический интерфейс был клиентом, так как я считаю, что это достойный метод для использования.
Подробнее здесь: https://stackoverflow.com/questions/790 ... th-asyncio
Мобильная версия