Source code for gobigger.server.server

import random
from easydict import EasyDict
import uuid
import logging
import cv2
import os
import sys
import time
import numpy as np
import copy
import pickle
import math

os.environ['PYGAME_HIDE_SUPPORT_PROMPT'] = "hide"
os.environ['SDL_AUDIODRIVER'] = 'dsp'

from pygame.math import Vector2

from gobigger.utils import Border, create_collision_detection, deep_merge_dicts, PlayerStatesUtil, SequenceGenerator
from gobigger.playbacks import create_pb
from gobigger.balls import FoodBall, ThornsBall, CloneBall, SporeBall
from gobigger.managers import FoodManager, SporeManager, ThornsManager, PlayerManager
from gobigger.configs import server_default_config


[docs]class Server: @staticmethod def default_config(): cfg = copy.deepcopy(server_default_config) return EasyDict(cfg) def __init__(self, cfg=None, seed=None): self.cfg = Server.default_config() if isinstance(cfg, dict): cfg = EasyDict(cfg) self.cfg = deep_merge_dicts(self.cfg, cfg) self.update_match_ratio() # update match ratio logging.debug(self.cfg) self.team_num = self.cfg.team_num self.player_num_per_team = self.cfg.player_num_per_team self.map_width = self.cfg.map_width self.map_height = self.cfg.map_height self.frame_limit = self.cfg.frame_limit self.fps = self.cfg.fps self.frame_duration = 1 / self.fps self.collision_detection_type = self.cfg.collision_detection_type self.eat_ratio = self.cfg.eat_ratio self.playback_settings = self.cfg.playback_settings self.opening_settings = self.cfg.opening_settings self.manager_settings = self.cfg.manager_settings self.obs_settings = self.cfg.obs_settings self.seed(seed) self.border = Border(0, 0, self.map_width, self.map_height, self._random) self.last_frame_count = 0 self.init_playback() self.init_opening() self.sequence_generator = SequenceGenerator() self.food_manager = FoodManager(self.manager_settings.food_manager, border=self.border, random_generator=self._random, sequence_generator=self.sequence_generator) self.thorns_manager = ThornsManager(self.manager_settings.thorns_manager, border=self.border, random_generator=self._random, sequence_generator=self.sequence_generator) self.spore_manager = SporeManager(self.manager_settings.spore_manager, border=self.border, random_generator=self._random, sequence_generator=self.sequence_generator) self.player_manager = PlayerManager(self.manager_settings.player_manager, border=self.border, team_num=self.team_num, player_num_per_team=self.player_num_per_team, spore_manager_settings=self.cfg.manager_settings.spore_manager, random_generator=self._random, sequence_generator=self.sequence_generator) self.init_obs() self.collision_detection = create_collision_detection(self.collision_detection_type, border=self.border) def update_match_ratio(self): # map size self.cfg.map_width = int(self.cfg.map_width * math.sqrt(self.cfg.match_ratio)) self.cfg.map_height = int(self.cfg.map_height * math.sqrt(self.cfg.match_ratio)) # food self.cfg.manager_settings.food_manager.num_init = int(self.cfg.manager_settings.food_manager.num_init * self.cfg.match_ratio) self.cfg.manager_settings.food_manager.num_min = int(self.cfg.manager_settings.food_manager.num_min * self.cfg.match_ratio) self.cfg.manager_settings.food_manager.num_max = int(self.cfg.manager_settings.food_manager.num_max * self.cfg.match_ratio) # thorns self.cfg.manager_settings.thorns_manager.num_init = int(self.cfg.manager_settings.thorns_manager.num_init * self.cfg.match_ratio) self.cfg.manager_settings.thorns_manager.num_min = int(self.cfg.manager_settings.thorns_manager.num_min * self.cfg.match_ratio) self.cfg.manager_settings.thorns_manager.num_max = int(self.cfg.manager_settings.thorns_manager.num_max * self.cfg.match_ratio) def init_playback(self): self.diff_balls_remove = [[], [], [], []] self.diff_balls_modify = [{}, {}, {}, {}] self.playback_type = self.playback_settings.playback_type self.save_video = self.playback_settings.by_video.save_video self.save_frame = self.playback_settings.by_frame.save_frame self.playback_util = create_pb(self.playback_settings, fps=self.fps, map_width=self.map_width, map_height=self.map_height) def init_opening(self): self.custom_init_food = [] self.custom_init_thorns = [] self.custom_init_spore = [] self.custom_init_clone = [] opening_type = self.opening_settings.opening_type if opening_type == 'none': pass elif opening_type == 'handcraft': self.custom_init_food = self.opening_settings.handcraft.food self.custom_init_thorns = self.opening_settings.handcraft.thorns self.custom_init_spore = self.opening_settings.handcraft.spore self.custom_init_clone = self.opening_settings.handcraft.clone elif opening_type == 'from_frame': if self.frame_path and os.path.isfile(self.frame_path): with open(self.frame_path, 'rb') as f: data = pickle.load(f) self.custom_init_food = data['food'] self.custom_init_thorns = data['thorns'] self.custom_init_spore = data['spore'] self.custom_init_clone = data['clone'] def init_obs(self): self.eats = {player_id: {'food': 0, 'thorns': 0, 'spore': 0, 'clone_self': 0, 'clone_team': 0, 'clone_other': 0, 'eaten': 0} \ for player_id in self.player_manager.get_player_names()} self.player_states_util = PlayerStatesUtil(self.obs_settings)
[docs] def spawn_balls(self): ''' Overview: Initialize all balls. If self.custom_init is set, initialize all balls based on it. ''' self.food_manager.init_balls(custom_init=self.custom_init_food) # init food self.thorns_manager.init_balls(custom_init=self.custom_init_thorns) # init thorns self.spore_manager.init_balls(custom_init=self.custom_init_spore) # init spore self.player_manager.init_balls(custom_init=self.custom_init_clone) # init player if self.save_frame: for ball in self.food_manager.get_balls(): self.diff_balls_modify[0][ball.ball_id] = ball.save() for ball in self.thorns_manager.get_balls(): self.diff_balls_modify[1][ball.ball_id] = ball.save() for ball in self.spore_manager.get_balls(): self.diff_balls_modify[2][ball.ball_id] = ball.save() for ball in self.player_manager.get_balls(): self.diff_balls_modify[3][ball.ball_id] = ball.save()
def step_one_frame(self, actions=None): moving_balls = [] # Record all balls in motion total_balls = [] # Record all balls # Update all player balls according to action if actions is not None and isinstance(actions, dict): for player in self.player_manager.get_players(): if player.player_id in actions: direction_x, direction_y, action_type = actions[player.player_id] if direction_x is None or direction_y is None: direction = None else: direction = Vector2(direction_x, direction_y) if direction.length() > 1: direction = direction.normalize() if action_type == 1: # eject tmp_spore_balls = player.eject(direction=direction) for tmp_spore_ball in tmp_spore_balls: if tmp_spore_ball: self.spore_manager.add_balls(tmp_spore_ball) if self.save_frame: self.diff_balls_modify[2][tmp_spore_ball.ball_id] = tmp_spore_ball.save() elif action_type == 2: # split self.player_manager.add_balls(player.split(direction=direction)) player.move(direction=direction, duration=self.frame_duration) moving_balls.extend(player.get_balls()) else: player.move(duration=self.frame_duration) moving_balls.extend(player.get_balls()) else: for player in self.player_manager.get_players(): player.move(duration=self.frame_duration) moving_balls.extend(player.get_balls()) moving_balls = sorted(moving_balls, reverse=True) # Sort by size # Update the status of other balls after moving, and record the balls with status updates for thorns_ball in self.thorns_manager.get_balls(): if thorns_ball.moving: thorns_ball.move(duration=self.frame_duration) if self.save_frame: self.diff_balls_modify[1][thorns_ball.ball_id] = thorns_ball.save() moving_balls.append(thorns_ball) for spore_ball in self.spore_manager.get_balls(): if spore_ball.moving: spore_ball.move(duration=self.frame_duration) if self.save_frame: self.diff_balls_modify[2][spore_ball.ball_id] = spore_ball.save() # Adjust the position of all player balls eats = self.player_manager.adjust() for player_id, clone_self_num in eats.items(): self.eats[player_id]['clone_self'] += clone_self_num # Collision detection total_balls.extend(self.player_manager.get_balls()) total_balls.extend(self.thorns_manager.get_balls()) total_balls.extend(self.spore_manager.get_balls()) total_balls.extend(self.food_manager.get_balls()) collisions_dict = self.collision_detection.solve(moving_balls, total_balls) # Process each ball in moving_balls for index, moving_ball in enumerate(moving_balls): if not moving_ball.is_remove and index in collisions_dict: for target_ball in collisions_dict[index]: self.deal_with_collision(moving_ball, target_ball) # After each tick, check if there is a need to update food, thorns, and player rebirth new_food_balls = self.food_manager.step(duration=self.frame_duration) new_thorns_balls = self.thorns_manager.step(duration=self.frame_duration) self.spore_manager.step(duration=self.frame_duration) self.player_manager.step() self.last_frame_count += 1 if self.save_frame: self.diff_balls_modify[0].update(new_food_balls) self.diff_balls_modify[1].update(new_thorns_balls) for ball in self.player_manager.get_balls(): self.diff_balls_modify[3][ball.ball_id] = ball.save() def deal_with_collision(self, moving_ball, target_ball): if not moving_ball.is_remove and not target_ball.is_remove: # Ensure that the two balls are present if isinstance(moving_ball, CloneBall): if isinstance(target_ball, CloneBall): if moving_ball.team_id != target_ball.team_id: if moving_ball.score > target_ball.score and self.can_eat(moving_ball.score, target_ball.score): moving_ball.eat(target_ball) self.eats[moving_ball.player_id]['clone_other'] += 1 self.eats[target_ball.player_id]['eaten'] += 1 self.player_manager.remove_balls(target_ball) elif self.can_eat(target_ball.score, moving_ball.score): target_ball.eat(moving_ball) self.eats[target_ball.player_id]['clone_other'] += 1 self.eats[moving_ball.player_id]['eaten'] += 1 self.player_manager.remove_balls(moving_ball) elif moving_ball.player_id != target_ball.player_id: if moving_ball.score > target_ball.score and self.can_eat(moving_ball.score, target_ball.score): if self.player_manager.get_clone_num(target_ball) > 1: moving_ball.eat(target_ball) self.eats[moving_ball.player_id]['clone_team'] += 1 self.eats[target_ball.player_id]['eaten'] += 1 self.player_manager.remove_balls(target_ball) elif self.can_eat(target_ball.score, moving_ball.score): if self.player_manager.get_clone_num(moving_ball) > 1: target_ball.eat(moving_ball) self.eats[target_ball.player_id]['clone_team'] += 1 self.eats[moving_ball.player_id]['eaten'] += 1 self.player_manager.remove_balls(moving_ball) elif isinstance(target_ball, FoodBall): moving_ball.eat(target_ball) self.eats[moving_ball.player_id]['food'] += 1 if self.save_frame: self.diff_balls_remove[0].append(target_ball.ball_id) self.food_manager.remove_balls(target_ball) elif isinstance(target_ball, SporeBall): moving_ball.eat(target_ball) self.eats[moving_ball.player_id]['spore'] += 1 if self.save_frame: self.diff_balls_remove[2].append(target_ball.ball_id) self.spore_manager.remove_balls(target_ball) elif isinstance(target_ball, ThornsBall): if moving_ball.score > target_ball.score and self.can_eat(moving_ball.score, target_ball.score): ret = moving_ball.eat(target_ball, clone_num=self.player_manager.get_clone_num(moving_ball)) self.eats[moving_ball.player_id]['thorns'] += 1 if self.save_frame: self.diff_balls_remove[1].append(target_ball.ball_id) self.thorns_manager.remove_balls(target_ball) if isinstance(ret, list): self.player_manager.add_balls(ret) elif isinstance(moving_ball, ThornsBall): if isinstance(target_ball, CloneBall): if moving_ball.score < target_ball.score and self.can_eat(target_ball.score, moving_ball.score): ret = target_ball.eat(moving_ball, clone_num=self.player_manager.get_clone_num(target_ball)) self.eats[target_ball.player_id]['thorns'] += 1 if self.save_frame: self.diff_balls_remove[1].append(moving_ball.ball_id) self.thorns_manager.remove_balls(moving_ball) if isinstance(ret, list): self.player_manager.add_balls(ret) elif isinstance(target_ball, SporeBall): moving_ball.eat(target_ball) if self.save_frame: self.diff_balls_remove[2].append(target_ball.ball_id) self.spore_manager.remove_balls(target_ball) elif isinstance(moving_ball, SporeBall): if isinstance(target_ball, CloneBall) or isinstance(target_ball, ThornsBall): target_ball.eat(moving_ball) if isinstance(target_ball, CloneBall): self.eats[target_ball.player_id]['spore'] += 1 if self.save_frame: self.diff_balls_remove[2].append(moving_ball.ball_id) if isinstance(target_ball, ThornsBall): self.diff_balls_modify[1][target_ball.ball_id] = target_ball.save() self.spore_manager.remove_balls(moving_ball) else: return def can_eat(self, score1, score2): if score1 > self.eat_ratio * score2: return True else: return False def reset(self): self.last_frame_count = 0 self.init_playback() self.init_opening() self.food_manager.reset() self.thorns_manager.reset() self.spore_manager.reset() self.player_manager.reset() self.spawn_balls() self.init_obs() self._end_flag = False def step(self, actions=None, save_frame_full_path='', **kwargs): if not self._end_flag: self.step_one_frame(actions) if self.playback_util.need_save(self.last_frame_count): if self.save_video: self.playback_util.save_step(food_balls=self.food_manager.get_balls(), thorns_balls=self.thorns_manager.get_balls(), spore_balls=self.spore_manager.get_balls(), players=self.player_manager.get_players(), player_num_per_team=self.player_num_per_team) elif self.save_frame: self.playback_util.save_step(diff_balls_remove=self.diff_balls_remove, diff_balls_modify=self.diff_balls_modify, leaderboard=self.leaderboard, last_frame_count=self.last_frame_count) self.diff_balls_remove = [[], [], [], []] self.diff_balls_modify = [{}, {}, {}, {}] if self.last_frame_count >= self.frame_limit: if not self._end_flag: self.playback_util.save_final(self.cfg) self._end_flag = True return self._end_flag def obs(self, obs_type='all'): assert obs_type in ['all', 'single'] global_state = self.get_global_state() player_states = self.player_states_util.get_player_states(food_balls=self.food_manager.get_balls(), thorns_balls=self.thorns_manager.get_balls(), spore_balls=self.spore_manager.get_balls(), players=self.player_manager.get_players()) self.leaderboard = global_state['leaderboard'] return global_state, player_states, {'eats': self.eats} def get_global_state(self): team_name_score = self.player_manager.get_teams_score() global_state = { 'border': [self.map_width, self.map_height], 'total_frame': self.frame_limit, 'last_frame_count': self.last_frame_count, 'last_time':self.last_frame_count, 'leaderboard': { i: team_name_score[i] for i in range(self.team_num) } } return global_state def get_player_names(self): return self.player_manager.get_player_names() def get_team_names(self): return self.player_manager.get_team_names() def get_player_names_with_team(self): return self.player_manager.get_player_names_with_team() def get_team_infos(self): return self.player_manager.get_team_infos() def close(self): if hasattr(self, 'render'): self.render.close() def seed(self, seed=None): if seed is None: self._seed = random.randrange(sys.maxsize) else: self._seed = seed self._random = random.Random(self._seed)