#! /usr/bin/env python3 # -*- coding: utf-8 -*- # LinuxGSM query_gsquery.py module # Author: Daniel Gibbs # Contributors: https://linuxgsm.com/contrib # Website: https://linuxgsm.com # Description: Allows querying of various game servers. """Query game servers using a set of lightweight UDP protocols.""" import argparse import socket import sys engine_types = ( "protocol-valve", "protocol-quake2", "protocol-quake3", "protocol-gamespy1", "protocol-unreal2", "ut3", "minecraft", "minecraftbe", "jc2m", "mumbleping", "soldat", "teeworlds", ) class GSQuery: """Game server query dispatcher.""" server_response_timeout = 2 default_buffer_length = 1024 sourcequery = ( "protocol-valve", "avalanche3.0", "barotrauma", "madness", "quakelive", "realvirtuality", "refractor", "source", "goldsrc", "spark", "starbound", "unity3d", "unreal4", "wurm", ) idtech2query = ("protocol-quake2", "idtech2", "quake", "iw2.0") idtech3query = ("protocol-quake3", "iw3.0", "ioquake3", "qfusion") minecraftquery = ("minecraft", "lwjgl2") minecraftbequery = ("minecraftbe",) jc2mquery = ("jc2m",) mumblequery = ("mumbleping",) soldatquery = ("soldat",) twquery = ("teeworlds",) unrealquery = ("protocol-gamespy1", "unreal") unreal2query = ("protocol-unreal2", "unreal2") unreal3query = ("ut3", "unreal3") def __init__(self, arguments): """Create a query instance from parsed CLI args.""" self.argument = arguments # if self.argument.engine in self.sourcequery: self.query_prompt_string = b"\xff\xff\xff\xffTSource Engine Query\0" elif self.argument.engine in self.idtech2query: self.query_prompt_string = b"\xff\xff\xff\xffstatus\x00" elif self.argument.engine in self.idtech3query: self.query_prompt_string = b"\xff\xff\xff\xffgetstatus" elif self.argument.engine in self.jc2mquery: self.query_prompt_string = b"\xfe\xfd\x09\x10\x20\x30\x40" elif self.argument.engine in self.minecraftquery: self.query_prompt_string = b"\xfe\xfd\x09\x3d\x54\x1f\x93" elif self.argument.engine in self.minecraftbequery: self.query_prompt_string = ( b"\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00" b"\xfe\xfe\xfe\xfe\xfd\xfd\xfd\xfd\x12\x34\x56\x78" b"\x00\x00\x00\x00\x00\x00\x00\x00" ) elif self.argument.engine in self.mumblequery: self.query_prompt_string = ( b"\x00\x00\x00\x00\x01\x02\x03\x04\x05\x06\x07\x08" ) elif self.argument.engine in self.soldatquery: self.query_prompt_string = b"\x69\x00" elif self.argument.engine in self.twquery: self.query_prompt_string = b"\x04\x00\x00\xff\xff\xff\xff\x05" + bytearray( 511 ) elif self.argument.engine in self.unrealquery: self.query_prompt_string = b"\x5c\x69\x6e\x66\x6f\x5c" elif self.argument.engine in self.unreal2query: self.query_prompt_string = b"\x79\x00\x00\x00\x00" elif self.argument.engine in self.unreal3query: self.query_prompt_string = b"\xfe\xfd\x09\x00\x00\x00\x00" self.connected = False self.response = None @staticmethod def fatal_error(error_message, error_code=1): """Write an error message to stderr and exit.""" sys.stderr.write("ERROR: " + str(error_message) + "\n") sys.exit(error_code) @staticmethod def exit_success(success_message=""): """Write a success message to stdout and exit.""" sys.stdout.write("OK: " + str(success_message) + "\n") sys.exit(0) def responding(self): """Send a single UDP query and print the response.""" # Connect. connection = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) connection.settimeout(self.server_response_timeout) try: self.connected = connection.connect( (self.argument.address, int(self.argument.port)) ) except socket.timeout: self.fatal_error("Request timed out", 1) except OSError: self.fatal_error("Unable to connect", 1) # Send. connection.send(self.query_prompt_string) # Receive. try: self.response = connection.recv(self.default_buffer_length) except socket.error: self.fatal_error("Unable to receive", 2) connection.close() # Response. if self.response is None: self.fatal_error("No response", 3) if len(self.response) < 5: sys.exit("Short response.", 3) else: self.exit_success(str(self.response)) def parse_args(): """Parse command-line arguments.""" parser = argparse.ArgumentParser( description="Allows querying of various game servers.", usage="usage: python3 %(prog)s [options]", add_help=False, ) parser.add_argument( "-a", "--address", type=str, required=True, help="The IPv4 address of the server.", ) parser.add_argument( "-p", "--port", type=int, required=True, help="The IPv4 port of the server.", ) parser.add_argument( "-e", "--engine", metavar="ENGINE", choices=engine_types, help="Engine type: " + " ".join(engine_types), ) parser.add_argument( "-v", "--verbose", action="store_true", help="Display verbose output.", ) parser.add_argument( "-d", "--debug", action="store_true", help="Display debugging output.", ) parser.add_argument( "-V", "--version", action="version", version="%(prog)s 0.0.1", help="Display version and exit.", ) parser.add_argument("-h", "--help", action="help", help="Display help and exit.") return parser.parse_args() def main(): """CLI entrypoint.""" arguments = parse_args() server = GSQuery(arguments) server.responding() if __name__ == "__main__": main()