| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211 |
- #! /usr/bin/env python3
- # -*- coding: utf-8 -*-
- # LinuxGSM query_gsquery.py module
- # Author: Daniel Gibbs
- # Contributors: https://linuxgsm.com/contrib
- # Website: https://linuxgsm.com
- # Description: Allows querying of various game servers.
- """Query game servers using a set of lightweight UDP protocols."""
- import argparse
- import socket
- import sys
- engine_types = (
- "protocol-valve",
- "protocol-quake2",
- "protocol-quake3",
- "protocol-gamespy1",
- "protocol-unreal2",
- "ut3",
- "minecraft",
- "minecraftbe",
- "jc2m",
- "mumbleping",
- "soldat",
- "teeworlds",
- )
- class GSQuery:
- """Game server query dispatcher."""
- server_response_timeout = 2
- default_buffer_length = 1024
- sourcequery = (
- "protocol-valve",
- "avalanche3.0",
- "barotrauma",
- "madness",
- "quakelive",
- "realvirtuality",
- "refractor",
- "source",
- "goldsrc",
- "spark",
- "starbound",
- "unity3d",
- "unreal4",
- "wurm",
- )
- idtech2query = ("protocol-quake2", "idtech2", "quake", "iw2.0")
- idtech3query = ("protocol-quake3", "iw3.0", "ioquake3", "qfusion")
- minecraftquery = ("minecraft", "lwjgl2")
- minecraftbequery = ("minecraftbe",)
- jc2mquery = ("jc2m",)
- mumblequery = ("mumbleping",)
- soldatquery = ("soldat",)
- twquery = ("teeworlds",)
- unrealquery = ("protocol-gamespy1", "unreal")
- unreal2query = ("protocol-unreal2", "unreal2")
- unreal3query = ("ut3", "unreal3")
- def __init__(self, arguments):
- """Create a query instance from parsed CLI args."""
- self.argument = arguments
- #
- if self.argument.engine in self.sourcequery:
- self.query_prompt_string = b"\xff\xff\xff\xffTSource Engine Query\0"
- elif self.argument.engine in self.idtech2query:
- self.query_prompt_string = b"\xff\xff\xff\xffstatus\x00"
- elif self.argument.engine in self.idtech3query:
- self.query_prompt_string = b"\xff\xff\xff\xffgetstatus"
- elif self.argument.engine in self.jc2mquery:
- self.query_prompt_string = b"\xfe\xfd\x09\x10\x20\x30\x40"
- elif self.argument.engine in self.minecraftquery:
- self.query_prompt_string = b"\xfe\xfd\x09\x3d\x54\x1f\x93"
- elif self.argument.engine in self.minecraftbequery:
- self.query_prompt_string = (
- b"\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00"
- b"\xfe\xfe\xfe\xfe\xfd\xfd\xfd\xfd\x12\x34\x56\x78"
- b"\x00\x00\x00\x00\x00\x00\x00\x00"
- )
- elif self.argument.engine in self.mumblequery:
- self.query_prompt_string = (
- b"\x00\x00\x00\x00\x01\x02\x03\x04\x05\x06\x07\x08"
- )
- elif self.argument.engine in self.soldatquery:
- self.query_prompt_string = b"\x69\x00"
- elif self.argument.engine in self.twquery:
- self.query_prompt_string = b"\x04\x00\x00\xff\xff\xff\xff\x05" + bytearray(
- 511
- )
- elif self.argument.engine in self.unrealquery:
- self.query_prompt_string = b"\x5c\x69\x6e\x66\x6f\x5c"
- elif self.argument.engine in self.unreal2query:
- self.query_prompt_string = b"\x79\x00\x00\x00\x00"
- elif self.argument.engine in self.unreal3query:
- self.query_prompt_string = b"\xfe\xfd\x09\x00\x00\x00\x00"
- self.connected = False
- self.response = None
- @staticmethod
- def fatal_error(error_message, error_code=1):
- """Write an error message to stderr and exit."""
- sys.stderr.write("ERROR: " + str(error_message) + "\n")
- sys.exit(error_code)
- @staticmethod
- def exit_success(success_message=""):
- """Write a success message to stdout and exit."""
- sys.stdout.write("OK: " + str(success_message) + "\n")
- sys.exit(0)
- def responding(self):
- """Send a single UDP query and print the response."""
- # Connect.
- connection = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- connection.settimeout(self.server_response_timeout)
- try:
- self.connected = connection.connect(
- (self.argument.address, int(self.argument.port))
- )
- except socket.timeout:
- self.fatal_error("Request timed out", 1)
- except OSError:
- self.fatal_error("Unable to connect", 1)
- # Send.
- connection.send(self.query_prompt_string)
- # Receive.
- try:
- self.response = connection.recv(self.default_buffer_length)
- except socket.error:
- self.fatal_error("Unable to receive", 2)
- connection.close()
- # Response.
- if self.response is None:
- self.fatal_error("No response", 3)
- if len(self.response) < 5:
- sys.exit("Short response.", 3)
- else:
- self.exit_success(str(self.response))
- def parse_args():
- """Parse command-line arguments."""
- parser = argparse.ArgumentParser(
- description="Allows querying of various game servers.",
- usage="usage: python3 %(prog)s [options]",
- add_help=False,
- )
- parser.add_argument(
- "-a",
- "--address",
- type=str,
- required=True,
- help="The IPv4 address of the server.",
- )
- parser.add_argument(
- "-p",
- "--port",
- type=int,
- required=True,
- help="The IPv4 port of the server.",
- )
- parser.add_argument(
- "-e",
- "--engine",
- metavar="ENGINE",
- choices=engine_types,
- help="Engine type: " + " ".join(engine_types),
- )
- parser.add_argument(
- "-v",
- "--verbose",
- action="store_true",
- help="Display verbose output.",
- )
- parser.add_argument(
- "-d",
- "--debug",
- action="store_true",
- help="Display debugging output.",
- )
- parser.add_argument(
- "-V",
- "--version",
- action="version",
- version="%(prog)s 0.0.1",
- help="Display version and exit.",
- )
- parser.add_argument("-h", "--help", action="help", help="Display help and exit.")
- return parser.parse_args()
- def main():
- """CLI entrypoint."""
- arguments = parse_args()
- server = GSQuery(arguments)
- server.responding()
- if __name__ == "__main__":
- main()
|