【Python】真にランダムな組み合わせを考えてみた

tsucci

まえがき

皆さんこんにちは、こんばんは、つっちーです。

ご存知かと思いますが、DWSではフルリモートワークで業務を行なっております。
って周りの人に言うと、
『えっ?それでちゃんとコミュニケーションとか取れてるの??』
みたいなことを言われることがちらほらあります。

DWSではコミュニケーションの一環として、毎日ランダムに選ばれた数名でフリートークする時間が設けられています。
素晴らしい施策ですね。
それは良いとして、最近気になっているのはその組み合わせです。

『あれ昨日も一昨日も一緒でしたよね?』
『みんな同じプロジェクトメンバー?この時間はプロジェクトミーティングですか??』

みたいな事がちらほら。。
Zoomのブレイクアウトルーム機能を使っているのですが、この機能に限らず偏っているな、って感じることありますよね。
システム時刻なんかを元に生成した乱数で、その場限りの乱択なので、仕方がないと言えばそれまでなのですが、
今回はちょっと掘り下げて、可能な限り、真にランダムな組み合わせを考えてみたいと思います。

問題

解決したい点は以下とします。

  1. 同じプロジェクトの人との組み合わせは、極力避ける
  2. 前日以前に同じ組み合わせだった人との組み合わせは、極力避ける

アプローチ

これはいわるゆる最適化問題に分類されます。WIkipediaによると、

最適化問題(さいてきかもんだい、英: optimization problem)とは、特定の集合上で定義された実数値関数または整数値関数についてその値が最小(もしくは最大)となる状態を解析する問題である[1]。こうした問題は総称して数理計画問題(すうりけいかくもんだい、英: mathematical programming problem, mathematical program)、数理計画とも呼ばれる

難しそうな問題ですね(実際難しいです。。)
何か良いライブラリはないかなーっと探してたらありましたっ!

https://github.com/perrygeo/simanneal

どうやら「焼きなまし法」を簡単に実装できるPythonのパッケージみたいですね。
おっ、焼きなまし法なら昔勉強したぞ、これはいけそうっ、ということで早速やってみます。

焼きなまし法

またしてもWIkipediaによると、

焼きなまし法(やきなましほう、英: Simulated Annealing、SAと略記、疑似アニーリング法、擬似焼きなまし法、シミュレーティド・アニーリングともいう)は、大域的最適化問題への汎用の乱択アルゴリズムである。広大な探索空間内の与えられた関数の大域的最適解に対して、よい近似を与える。

ものすごくざっくり言うと、

ある「状態」に対して、何かを「作用」させて、状態が改善したら採用し、改悪なら不採用とする、ということを有限回繰り返す。
試行回数が十分であれば、最適解を得る確率が1に近づくことが一般に知られている。

と言うものです。

simannealパッケージ

「焼きなまし」と言う言葉が金属工学から来ているので、温度とかエネルギーというワードが出てきますね。
ざっくり要約していきます。チュートリアルは巡回セールスマン問題です。

  1. クラスを継承して問題を定義する

    from simanneal import Annealer
    class TravellingSalesmanProblem(Annealer):
    """Test annealer with a travelling salesman problem."""
  2. 作用関数を定義する

    def move(self):
        """Swaps two cities in the route."""
        a = random.randint(0, len(self.state) - 1)
        b = random.randint(0, len(self.state) - 1)
        self.state[a], self.state[b] = self.state[b], self.state[a]
  3. 目的関数を定義する

    def energy(self):
        """Calculates the length of the route."""
        e = 0
        for i in range(len(self.state)):
            e += self.distance(cities[self.state[i - 1]],
                          cities[self.state[i]])
        return e
  4. 実行する

    initial_state = ['New York', 'Los Angeles', 'Boston', 'Houston']
    tsp = TravellingSalesmanProblem(initial_state)
    tsp.steps = 10**6 # 試行回数
    itinerary, miles = tsp.anneal()
    # ...
    tsp.state # これが最適化された状態

やってみる

問題を再掲します。

  1. 同じプロジェクトの人との組み合わせは、極力避ける
  2. 前日以前に同じ組み合わせだった人との組み合わせは、極力避ける

各メンバーとプそのロジェクトをリスト化する

members = [
    ["山田", "p1"],
    ["田中", "p2"],
    ["伊藤", "p3"],
    ["村田", "p2"],
    // ...
]

状態を定義する

# 仮で24人を3人ごと(8チーム)に分けることを想定
num_members = len(members)
num_team = 8
member_team = [i % num_team for i in range(num_members)]
team_to_member = [[] for _ in range(num_team)]
project_count_by_team = [defaultdict(int) for _ in range(num_team)]

for i in range(num_members):
    team_to_member[member_team[i]].append(i)
    project_count_by_team[member_team[i]][members[i][1]] += 1

init_state = [member_team, team_to_member, project_count_by_team]

初期状態

[
 # 初期値として順番にチームを割り当て
 [0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7],
 # 各チームのメンバー
 [[0, 8, 16],
  [1, 9, 17],
  [2, 10, 18],
  [3, 11, 19],
  [4, 12, 20],
  [5, 13, 21],
  [6, 14, 22],
  [7, 15, 23]],
  # 各チームのプロジェクト数
 [defaultdict(<class 'int'>, {'p1': 1, 'p2': 2}),
  defaultdict(<class 'int'>, {'p2': 1, 'p4': 2}),
  defaultdict(<class 'int'>, {'p3': 1, 'p4': 1, 'p2': 1}),
  defaultdict(<class 'int'>, {'p1': 1, 'p3': 1, 'p4': 1}),
  defaultdict(<class 'int'>, {'p2': 1, 'p3': 2}),
  defaultdict(<class 'int'>, {'p3': 2, 'p4': 1}),
  defaultdict(<class 'int'>, {'p4': 2, 'p1': 1}),
  defaultdict(<class 'int'>, {'p1': 1, 'p3': 2})]
 ]

同じにチームになった回数を記録する関数を定義する

# メンバーの二次元リストを0埋めする(初期値)
num_same_team = [[0] * num_members for _ in range(num_members)]

def record_num_same_team(num_same_team, team_to_member):
    num_same_team_next = deepcopy(num_same_team)

    # 同じチームになったらインクリメントする
    for m in team_to_member:
        for i in range(len(m)):
            m1 = m[i]
            for j in range(i+1, len(m)):
                m2 = m[j]
                num_same_team_next[m1][m2] += 1
                num_same_team_next[m2][m1] += 1

    return num_same_team_next

チームtにおける重複度を計算する関数を定義する

def calc_team_cost(t, team_to_members, project_count_by_team, num_same_team):
    # 過去に同じチームになった回数
    g = 0
    team = team_to_members[t]
    for i in range(len(team)):
        m1 = team[i]
        for j in range(i+1, len(team)):
            m2 = team[j]
            g += num_same_team[m1][m2] ** 2

    # プロジェクトの重複数
    d = 0
    for v in project_count_by_team[t].values():
        d += max(0, v-1)**2

    return g + d

作用関数を定義する

def move(self):
    # ランダムにメンバーを選択
    a = choice(range(num_members))
    b = choice(range(num_members))

    # メンバーorメンバーのチームが同じならばスキップ
    if a == b:
        return 0

    a_team = self.state[0][a]
    b_team = self.state[0][b]

    if a_team == b_team:
        return 0

    # 変更前の重複度を計算
    cost_a_before = calc_team_cost(a_team, self.state[1], self.state[2], self.num_same_team)
    cost_b_before = calc_team_cost(b_team, self.state[1], self.state[2], self.num_same_team)

    # メンバーを交換
    self.state[2][a_team][members[a][1]] -= 1
    self.state[2][b_team][members[b][1]] -= 1

    self.state[1][a_team].remove(a)
    self.state[1][b_team].remove(b)

    self.state[0][a], self.state[0][b] = self.state[0][b], self.state[0][a]

    self.state[2][b_team][members[a][1]] += 1
    self.state[2][a_team][members[b][1]] += 1

    self.state[1][b_team].append(a)
    self.state[1][a_team].append(b)

    # 変更後の重複度を計算
    cost_a_after = calc_team_cost(a_team, self.state[1], self.state[2], self.num_same_team)
    cost_b_after = calc_team_cost(b_team, self.state[1], self.state[2], self.num_same_team)

    # 差分を返す
    return cost_a_after - cost_a_before + cost_b_after - cost_b_before

目的関数を定義する

def energy(self):
    # 各チームの重複度の総和(この値を最小化する)
    return sum(calc_team_cost(i, self.state[1], self.state[2], self.num_same_team) for i in range(num_team))

問題を定義する

class GroupingProblem(Annealer):
    def __init__(self, init_state, num_same_team):
        super(GroupingProblem, self).__init__(init_state)
        self.num_same_team = num_same_team

    def move(self):
        # ...

    def energy(self):
        # ...

実行する

if __name__ == "__main__":
    num_same_team = [[0] * num_members for _ in range(num_members)]

    # 1週間分を想定して5回分の重複をみる
    for i in range(5):
        problem = GroupingProblem(init_state, num_same_team)
        problem.steps = 10**5
        problem.copy_strategy = "deepcopy"
        state, e = problem.anneal()
        num_same_team = record_num_same_team(num_same_team, problem.state[1])

実行結果①

take1: [0, 2, 2, 7, 6, 6, 5, 3, 7, 2, 0, 3, 7, 1, 1, 5, 5, 4, 4, 6, 4, 0, 3, 1]
take2: [7, 1, 5, 2, 3, 7, 0, 5, 6, 4, 1, 3, 0, 6, 4, 6, 2, 3, 0, 5, 2, 4, 7, 1]
take3: [4, 3, 7, 7, 7, 3, 4, 1, 0, 0, 1, 6, 6, 3, 2, 2, 5, 2, 1, 6, 4, 5, 5, 0]
take4: [7, 6, 1, 3, 5, 3, 0, 2, 0, 2, 3, 0, 5, 5, 1, 4, 1, 6, 7, 7, 2, 6, 4, 4]
take5: [5, 7, 6, 0, 3, 4, 1, 4, 5, 0, 3, 0, 2, 4, 1, 3, 2, 2, 6, 5, 7, 6, 7, 1]
試行数 プロジェクトの重複 前日以前メンバーの重複
1 0 0
2 0 2
3 2 2
4 4 4
5 2 8

近似なので多少の重複は出ちゃいますね。。
前日以前メンバーの重複については、当然ながら回を重ねるごとに増えていきますね。

実行結果②

前日以前のメンバー重複の重みを増やす(^2 -> ^3)

take1: [3, 6, 5, 6, 2, 2, 2, 7, 0, 4, 7, 3, 0, 1, 4, 6, 5, 0, 1, 3, 4, 1, 5, 7]
take2: [3, 6, 2, 4, 7, 1, 4, 0, 1, 5, 2, 6, 4, 7, 2, 5, 3, 6, 5, 0, 7, 0, 1, 3]
take3: [5, 2, 3, 6, 3, 4, 7, 1, 0, 2, 6, 7, 1, 0, 4, 0, 4, 5, 7, 3, 6, 5, 1, 2]
take4: [1, 1, 1, 3, 2, 6, 5, 6, 5, 3, 4, 3, 4, 4, 0, 0, 7, 0, 6, 7, 7, 5, 2, 2]
take5: [0, 2, 3, 1, 6, 0, 3, 6, 1, 4, 5, 7, 4, 0, 7, 5, 5, 6, 3, 2, 2, 4, 7, 1]
試行数 プロジェクトの重複 前日以前メンバーの重複
1 0 0
2 0 0
3 0 0
4 2 0
5 2 4

お、かなり良い感じじゃないでしょうか。
「焼きなまし法」はかなり汎用的なアルゴリズムかつ、パッケージで簡単に実装できるので
他の組み合わせ問題に出会した時も使えそうです。。

参考文献

AUTHOR
tsucci
tsucci
記事URLをコピーしました