query_gsquery.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. #! /usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. # LinuxGSM query_gsquery.py module
  4. # Author: Daniel Gibbs
  5. # Contributors: https://linuxgsm.com/contrib
  6. # Website: https://linuxgsm.com
  7. # Description: Allows querying of various game servers.
  8. """Query game servers using a set of lightweight UDP protocols."""
  9. import argparse
  10. import socket
  11. import sys
  12. engine_types = (
  13. "protocol-valve",
  14. "protocol-quake2",
  15. "protocol-quake3",
  16. "protocol-gamespy1",
  17. "protocol-unreal2",
  18. "ut3",
  19. "minecraft",
  20. "minecraftbe",
  21. "jc2m",
  22. "mumbleping",
  23. "soldat",
  24. "teeworlds",
  25. )
  26. class GSQuery:
  27. """Game server query dispatcher."""
  28. server_response_timeout = 2
  29. default_buffer_length = 1024
  30. sourcequery = (
  31. "protocol-valve",
  32. "avalanche3.0",
  33. "barotrauma",
  34. "madness",
  35. "quakelive",
  36. "realvirtuality",
  37. "refractor",
  38. "source",
  39. "goldsrc",
  40. "spark",
  41. "starbound",
  42. "unity3d",
  43. "unreal4",
  44. "wurm",
  45. )
  46. idtech2query = ("protocol-quake2", "idtech2", "quake", "iw2.0")
  47. idtech3query = ("protocol-quake3", "iw3.0", "ioquake3", "qfusion")
  48. minecraftquery = ("minecraft", "lwjgl2")
  49. minecraftbequery = ("minecraftbe",)
  50. jc2mquery = ("jc2m",)
  51. mumblequery = ("mumbleping",)
  52. soldatquery = ("soldat",)
  53. twquery = ("teeworlds",)
  54. unrealquery = ("protocol-gamespy1", "unreal")
  55. unreal2query = ("protocol-unreal2", "unreal2")
  56. unreal3query = ("ut3", "unreal3")
  57. def __init__(self, arguments):
  58. """Create a query instance from parsed CLI args."""
  59. self.argument = arguments
  60. #
  61. if self.argument.engine in self.sourcequery:
  62. self.query_prompt_string = b"\xff\xff\xff\xffTSource Engine Query\0"
  63. elif self.argument.engine in self.idtech2query:
  64. self.query_prompt_string = b"\xff\xff\xff\xffstatus\x00"
  65. elif self.argument.engine in self.idtech3query:
  66. self.query_prompt_string = b"\xff\xff\xff\xffgetstatus"
  67. elif self.argument.engine in self.jc2mquery:
  68. self.query_prompt_string = b"\xfe\xfd\x09\x10\x20\x30\x40"
  69. elif self.argument.engine in self.minecraftquery:
  70. self.query_prompt_string = b"\xfe\xfd\x09\x3d\x54\x1f\x93"
  71. elif self.argument.engine in self.minecraftbequery:
  72. self.query_prompt_string = (
  73. b"\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00"
  74. b"\xfe\xfe\xfe\xfe\xfd\xfd\xfd\xfd\x12\x34\x56\x78"
  75. b"\x00\x00\x00\x00\x00\x00\x00\x00"
  76. )
  77. elif self.argument.engine in self.mumblequery:
  78. self.query_prompt_string = (
  79. b"\x00\x00\x00\x00\x01\x02\x03\x04\x05\x06\x07\x08"
  80. )
  81. elif self.argument.engine in self.soldatquery:
  82. self.query_prompt_string = b"\x69\x00"
  83. elif self.argument.engine in self.twquery:
  84. self.query_prompt_string = b"\x04\x00\x00\xff\xff\xff\xff\x05" + bytearray(
  85. 511
  86. )
  87. elif self.argument.engine in self.unrealquery:
  88. self.query_prompt_string = b"\x5c\x69\x6e\x66\x6f\x5c"
  89. elif self.argument.engine in self.unreal2query:
  90. self.query_prompt_string = b"\x79\x00\x00\x00\x00"
  91. elif self.argument.engine in self.unreal3query:
  92. self.query_prompt_string = b"\xfe\xfd\x09\x00\x00\x00\x00"
  93. self.connected = False
  94. self.response = None
  95. @staticmethod
  96. def fatal_error(error_message, error_code=1):
  97. """Write an error message to stderr and exit."""
  98. sys.stderr.write("ERROR: " + str(error_message) + "\n")
  99. sys.exit(error_code)
  100. @staticmethod
  101. def exit_success(success_message=""):
  102. """Write a success message to stdout and exit."""
  103. sys.stdout.write("OK: " + str(success_message) + "\n")
  104. sys.exit(0)
  105. def responding(self):
  106. """Send a single UDP query and print the response."""
  107. # Connect.
  108. connection = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  109. connection.settimeout(self.server_response_timeout)
  110. try:
  111. self.connected = connection.connect(
  112. (self.argument.address, int(self.argument.port))
  113. )
  114. except socket.timeout:
  115. self.fatal_error("Request timed out", 1)
  116. except OSError:
  117. self.fatal_error("Unable to connect", 1)
  118. # Send.
  119. connection.send(self.query_prompt_string)
  120. # Receive.
  121. try:
  122. self.response = connection.recv(self.default_buffer_length)
  123. except socket.error:
  124. self.fatal_error("Unable to receive", 2)
  125. connection.close()
  126. # Response.
  127. if self.response is None:
  128. self.fatal_error("No response", 3)
  129. if len(self.response) < 5:
  130. sys.exit("Short response.", 3)
  131. else:
  132. self.exit_success(str(self.response))
  133. def parse_args():
  134. """Parse command-line arguments."""
  135. parser = argparse.ArgumentParser(
  136. description="Allows querying of various game servers.",
  137. usage="usage: python3 %(prog)s [options]",
  138. add_help=False,
  139. )
  140. parser.add_argument(
  141. "-a",
  142. "--address",
  143. type=str,
  144. required=True,
  145. help="The IPv4 address of the server.",
  146. )
  147. parser.add_argument(
  148. "-p",
  149. "--port",
  150. type=int,
  151. required=True,
  152. help="The IPv4 port of the server.",
  153. )
  154. parser.add_argument(
  155. "-e",
  156. "--engine",
  157. metavar="ENGINE",
  158. choices=engine_types,
  159. help="Engine type: " + " ".join(engine_types),
  160. )
  161. parser.add_argument(
  162. "-v",
  163. "--verbose",
  164. action="store_true",
  165. help="Display verbose output.",
  166. )
  167. parser.add_argument(
  168. "-d",
  169. "--debug",
  170. action="store_true",
  171. help="Display debugging output.",
  172. )
  173. parser.add_argument(
  174. "-V",
  175. "--version",
  176. action="version",
  177. version="%(prog)s 0.0.1",
  178. help="Display version and exit.",
  179. )
  180. parser.add_argument("-h", "--help", action="help", help="Display help and exit.")
  181. return parser.parse_args()
  182. def main():
  183. """CLI entrypoint."""
  184. arguments = parse_args()
  185. server = GSQuery(arguments)
  186. server.responding()
  187. if __name__ == "__main__":
  188. main()