| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159 |
- #! /usr/bin/env python3
- # -*- coding: utf-8 -*-
- # LinuxGSM query_gsquery.py module
- # Author: Daniel Gibbs
- # Contributors: http://linuxgsm.com/contrib
- # Website: https://linuxgsm.com
- # Description: Allows querying of various game servers.
- import argparse
- import socket
- import sys
- engine_types = ('protocol-valve', 'protocol-quake2', 'protocol-quake3', 'protocol-gamespy1',
- 'protocol-unreal2', 'ut3', 'minecraft', 'minecraftbe', 'jc2m', 'mumbleping', 'soldat', 'teeworlds')
- class gsquery:
- server_response_timeout = 2
- default_buffer_length = 1024
- sourcequery = ('protocol-valve', 'avalanche3.0', 'barotrauma', 'madness', 'quakelive', 'realvirtuality',
- 'refractor', 'source', 'goldsrc', 'spark', 'starbound', 'unity3d', 'unreal4', 'wurm')
- idtech2query = ('protocol-quake2', 'idtech2', 'quake', 'iw2.0')
- idtech3query = ('protocol-quake3', 'iw3.0', 'ioquake3', 'qfusion')
- minecraftquery = ('minecraft', 'lwjgl2')
- minecraftbequery = ('minecraftbe',)
- jc2mquery = ('jc2m',)
- mumblequery = ('mumbleping',)
- soldatquery = ('soldat',)
- twquery = ('teeworlds',)
- unrealquery = ('protocol-gamespy1', 'unreal')
- unreal2query = ('protocol-unreal2', 'unreal2')
- unreal3query = ('ut3', 'unreal3')
- def __init__(self, arguments):
- self.argument = arguments
- #
- if self.argument.engine in self.sourcequery:
- self.query_prompt_string = b'\xFF\xFF\xFF\xFFTSource Engine Query\0'
- elif self.argument.engine in self.idtech2query:
- self.query_prompt_string = b'\xff\xff\xff\xffstatus\x00'
- elif self.argument.engine in self.idtech3query:
- self.query_prompt_string = b'\xff\xff\xff\xffgetstatus'
- elif self.argument.engine in self.jc2mquery:
- self.query_prompt_string = b'\xFE\xFD\x09\x10\x20\x30\x40'
- elif self.argument.engine in self.minecraftquery:
- self.query_prompt_string = b'\xFE\xFD\x09\x3d\x54\x1f\x93'
- elif self.argument.engine in self.minecraftbequery:
- self.query_prompt_string = b'\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00\xfe\xfe\xfe\xfe\xfd\xfd\xfd\xfd\x12\x34\x56\x78\x00\x00\x00\x00\x00\x00\x00\x00'
- elif self.argument.engine in self.mumblequery:
- self.query_prompt_string = b'\x00\x00\x00\x00\x01\x02\x03\x04\x05\x06\x07\x08'
- elif self.argument.engine in self.soldatquery:
- self.query_prompt_string = b'\x69\x00'
- elif self.argument.engine in self.twquery:
- self.query_prompt_string = b'\x04\x00\x00\xff\xff\xff\xff\x05' + \
- bytearray(511)
- elif self.argument.engine in self.unrealquery:
- self.query_prompt_string = b'\x5C\x69\x6E\x66\x6F\x5C'
- elif self.argument.engine in self.unreal2query:
- self.query_prompt_string = b'\x79\x00\x00\x00\x00'
- elif self.argument.engine in self.unreal3query:
- self.query_prompt_string = b'\xFE\xFD\x09\x00\x00\x00\x00'
- self.connected = False
- self.response = None
- @staticmethod
- def fatal_error(error_message, error_code=1):
- sys.stderr.write('ERROR: ' + str(error_message) + '\n')
- sys.exit(error_code)
- @staticmethod
- def exit_success(success_message=''):
- sys.stdout.write('OK: ' + str(success_message) + '\n')
- sys.exit(0)
- def responding(self):
- # Connect.
- connection = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- connection.settimeout(self.server_response_timeout)
- try:
- self.connected = connection.connect(
- (self.argument.address, int(self.argument.port)))
- except socket.timeout:
- self.fatal_error('Request timed out', 1)
- except Exception:
- self.fatal_error('Unable to connect', 1)
- # Send.
- connection.send(self.query_prompt_string)
- # Receive.
- try:
- self.response = connection.recv(self.default_buffer_length)
- except socket.error:
- self.fatal_error('Unable to receive', 2)
- connection.close()
- # Response.
- if self.response is None:
- self.fatal_error('No response', 3)
- if len(self.response) < 5:
- sys.exit('Short response.', 3)
- else:
- self.exit_success(str(self.response))
- def parse_args():
- parser = argparse.ArgumentParser(
- description='Allows querying of various game servers.',
- usage='usage: python3 %(prog)s [options]',
- add_help=False
- )
- parser.add_argument(
- '-a', '--address',
- type=str,
- required=True,
- help='The IPv4 address of the server.'
- )
- parser.add_argument(
- '-p', '--port',
- type=int,
- required=True,
- help='The IPv4 port of the server.'
- )
- parser.add_argument(
- '-e', '--engine',
- metavar='ENGINE',
- choices=engine_types,
- help='Engine type: ' + ' '.join(engine_types)
- )
- parser.add_argument(
- '-v', '--verbose',
- action='store_true',
- help='Display verbose output.'
- )
- parser.add_argument(
- '-d', '--debug',
- action='store_true',
- help='Display debugging output.'
- )
- parser.add_argument(
- '-V', '--version',
- action='version',
- version='%(prog)s 0.0.1',
- help='Display version and exit.'
- )
- parser.add_argument(
- '-h', '--help',
- action='help',
- help='Display help and exit.'
- )
- return parser.parse_args()
- def main():
- arguments = parse_args()
- server = gsquery(arguments)
- server.responding()
- if __name__ == '__main__':
- main()
|