Source code for btb.selection.best

import logging

import numpy as np

from btb.selection.ucb1 import UCB1

# the minimum number of scores that each choice must have in order to use best-K
# optimizations. If not all choices meet this threshold, default UCB1 selection
# will be used.
K_MIN = 2

logger = logging.getLogger('btb')


[docs]class BestKReward(UCB1): """Best K reward selector Computes the average reward from the past scores by using only the highest k scores. In implementation, the other scores are replaced with ``nan``s such that they still factor into the number of arm pulls. Args: k (int): number of best scores to consider """ def __init__(self, choices, k=K_MIN): super(BestKReward, self).__init__(choices) self.k = k
[docs] def compute_rewards(self, scores): """Retain the K best scores, and replace the rest with nans""" if len(scores) > self.k: scores = np.copy(scores) inds = np.argsort(scores)[:-self.k] scores[inds] = np.nan return list(scores)
[docs] def select(self, choice_scores): """Select a choice using the K best scores Keeps the choice counts intact, but only let the bandit see the top k learners' scores. If there is not enough score history to do K-selection, use the default UCB1 reward function. """ min_num_scores = min(len(s) for s in choice_scores.values()) if min_num_scores >= K_MIN: logger.info( '{klass}: using Best K bandit selection' .format(klass=type(self).__name__)) compute_rewards = self.compute_rewards else: logger.warning( '{klass}: Not enough choices to do K-selection; using plain UCB1' .format(klass=type(self).__name__)) compute_rewards = super(BestKReward, self).compute_rewards # convert the raw scores list for each choice to a "rewards" list choice_rewards = { choice: compute_rewards(choice_scores[choice]) for choice in choice_scores if choice in self.choices } return self.bandit(choice_rewards)
[docs]class BestKVelocity(BestKReward): """Best K velocity selector"""
[docs] def compute_rewards(self, scores): """Compute the velocity of the best scores The velocities are the k distances between the k+1 best scores. """ k = self.k m = max(len(scores) - k, 0) best_scores = sorted(scores)[-k - 1:] velocities = np.diff(best_scores) nans = np.full(m, np.nan) return list(velocities) + list(nans)