packet:codedumps
This is an old revision of the document!
Table of Contents
Simple Chat Application that uses port 33000 and can be connected to BPQ32
import socket import threading class ChatServer: def __init__(self): self.host = 'localhost' self.port = 33000 self.server = None self.clients = {} self.lock = threading.Lock() def start(self): self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server.bind((self.host, self.port)) self.server.listen(5) print("Chat server started on {}:{}".format(self.host, self.port)) while True: client_socket, client_address = self.server.accept() print("New connection from:", client_address) threading.Thread(target=self.handle_client, args=(client_socket,)).start() def handle_client(self, client_socket): username = client_socket.recv(1024).decode('utf-8') self.add_client(client_socket, username) while True: try: message = client_socket.recv(1024).decode('utf-8') if not message: self.remove_client(client_socket) break self.broadcast(message) except ConnectionResetError: self.remove_client(client_socket) break def add_client(self, client_socket, username): self.lock.acquire() self.clients[client_socket] = username self.lock.release() print("New user joined:", username) def remove_client(self, client_socket): self.lock.acquire() username = self.clients[client_socket] del self.clients[client_socket] self.lock.release() client_socket.close() print("User disconnected:", username) def broadcast(self, message): self.lock.acquire() for client_socket in self.clients: client_socket.send(message.encode('utf-8')) self.lock.release() if __name__ == "__main__": chat_server = ChatServer() chat_server.start()
Simple Chat Application that uses telnet to connect to bpq and connect to that chat server above
import socket import sys import threading import time import os HOST = 'HOSTIP' # Replace with the IP address or hostname of the server PORT = 8010 USERNAME = 'TCPUSERNAME' PASSWORD = 'TCPPASSWORD' print('Welcome to Game Client') # Create a socket and connect to the server try: sock = socket.create_connection((HOST, PORT)) except ConnectionRefusedError: print(f"Failed to connect to {HOST}:{PORT}") sys.exit(1) # Receive and print the login prompt login_prompt = sock.recv(1024).decode('utf-8', errors='ignore') print(login_prompt) # Send the username sock.sendall((USERNAME + "\r\n").encode('utf-8')) # Receive and print the password prompt time.sleep(1) password_prompt = sock.recv(1024).decode('utf-8', errors='ignore') print(password_prompt) # Send the password sock.sendall((PASSWORD + "\r\n").encode('utf-8')) # Function to continuously receive and display server responses def receive_responses(): while True: response = sock.recv(1024).decode('utf-8', errors='ignore') print(response) # Create and start the thread for receiving responses response_thread = threading.Thread(target=receive_responses) response_thread.start() # Wait for 3 seconds time.sleep(3) # Send the command "c oukdev" to the server sock.sendall(("c NODEALIAS" + "\r\n").encode('utf-8')) os.system('clear') print('Connected to Game Server NODEALIAS') # Main thread for sending commands while True: command = input() # Send the command to the server sock.sendall((command + "\r\n").encode('utf-8')) if command.lower() == "exit": break # Wait for the response thread to finish and close the connection response_thread.join() sock.close()
packet/codedumps.1689530931.txt.gz ยท Last modified: 2023/07/16 18:08 by m0ukf