The following code, developed in part with ChatGPT, can be used with BPQ32 to create a chat server. When a user connects to the alias/node the server will log who has connected and then echo every message it recieves from that link to all other connected users
import socket import threading class ChatServer: def __init__(self): self.host = 'localhost' self.port = 33000 self.server = None self.clients = {} self.lock = threading.Lock() def start(self): self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server.bind((self.host, self.port)) self.server.listen(5) print("Chat server started on {}:{}".format(self.host, self.port)) while True: client_socket, client_address = self.server.accept() print("New connection from:", client_address) threading.Thread(target=self.handle_client, args=(client_socket,)).start() def handle_client(self, client_socket): username = client_socket.recv(1024).decode('utf-8') self.add_client(client_socket, username) while True: try: message = client_socket.recv(1024).decode('utf-8') if not message: self.remove_client(client_socket) break self.broadcast(message) except ConnectionResetError: self.remove_client(client_socket) break def add_client(self, client_socket, username): self.lock.acquire() self.clients[client_socket] = username self.lock.release() print("New user joined:", username) def remove_client(self, client_socket): self.lock.acquire() username = self.clients[client_socket] del self.clients[client_socket] self.lock.release() client_socket.close() print("User disconnected:", username) def broadcast(self, message): self.lock.acquire() for client_socket in self.clients: client_socket.send(message.encode('utf-8')) self.lock.release() if __name__ == "__main__": chat_server = ChatServer() chat_server.start()
The following code can be used to telnet into a BPQ32 node and tell it to connect to a specific application/alias - it compliments the above chat server - and provides an interface to interact with it. This proof of concept code can be adapted so that any data can be sent/received by any application
import socket import sys import threading import time import os HOST = 'HOSTIP' # Replace with the IP address or hostname of the server PORT = 8010 USERNAME = 'TCPUSERNAME' PASSWORD = 'TCPPASSWORD' print('Welcome to Game Client') # Create a socket and connect to the server try: sock = socket.create_connection((HOST, PORT)) except ConnectionRefusedError: print(f"Failed to connect to {HOST}:{PORT}") sys.exit(1) # Receive and print the login prompt login_prompt = sock.recv(1024).decode('utf-8', errors='ignore') print(login_prompt) # Send the username sock.sendall((USERNAME + "\r\n").encode('utf-8')) # Receive and print the password prompt time.sleep(1) password_prompt = sock.recv(1024).decode('utf-8', errors='ignore') print(password_prompt) # Send the password sock.sendall((PASSWORD + "\r\n").encode('utf-8')) # Function to continuously receive and display server responses def receive_responses(): while True: response = sock.recv(1024).decode('utf-8', errors='ignore') print(response) # Create and start the thread for receiving responses response_thread = threading.Thread(target=receive_responses) response_thread.start() # Wait for 3 seconds time.sleep(3) # Send the command "c oukdev" to the server sock.sendall(("c NODEALIAS" + "\r\n").encode('utf-8')) os.system('clear') print('Connected to Game Server NODEALIAS') # Main thread for sending commands while True: command = input() # Send the command to the server sock.sendall((command + "\r\n").encode('utf-8')) if command.lower() == "exit": break # Wait for the response thread to finish and close the connection response_thread.join() sock.close()