進化計算ライブラリDEAPでレイリー減衰パラメータ探査

前回の「遺伝的アルゴリズムによるレイリー減衰のパラメータ探査」で試した、一定減衰に相当するレイリー減衰のパラメータ探査を、今回はPythonの進化計算ライブラリのDEAPを使ってやってみます。

プログラムは、データサイエンス教育総合研究所のサイトを参考にさせていただきました。参考にしたというより、目的関数のところを一定減衰Shakeとレイリー減衰応答解析の差分2乗和とする関数に置き換えただけです。

前回は、遺伝的アルゴリズムをプログラミングしてパラメータを探しましたが、DEAPを使うとアルゴリズムの詳細をプログラミングしなくてもパラメータ探査できます。

実行結果は以下のとおり。

最も良い個体は [2.572249424150136, 6.2771604118935755]で、そのときの目的関数の値は (7.337222985431525,)

上記の個体の数値がレイリー減衰パラメータである二つの振動数になります。

このレイリー減衰のパラメータで応答計算し、評価位置で応答スペクトルを比較したものが下図です。

DEAPによるレイリー減衰パラメータ探査

一定減衰とレイリー減衰の応答性状がよく合致しています。

参考までに、全く的外れなレーリー減衰のパラメータとして、0.02Hzと50Hzを指定した場合の比較は下図になります。

不適切なレイリー減衰パラメータの応答スペクトル比較

以下のプログラムでは、この0.02Hzから50Hzを遺伝子の範囲としています。

#個体の各遺伝子を決めるために使用
import random
import numpy as np
import pandas as pd

#DEAPの中にある必要なモジュールをインポート
from deap import base
from deap import creator
from deap import tools
from deap import algorithms

from FreeField import FreeField

#最小化問題として設定(-1.0で最小化、1.0で最大化問題)
#面食らうかもしれませんが、目的が最小化の場合は以下のように設定します。
creator.create("FitnessMin", base.Fitness, weights=(-1.0,))

#個体の定義(list型と指定しただけで、中身の遺伝子は後で入れる)
#これも面食らうかもしれませんが、個体を準備したと思ってください。
creator.create("Individual", list, fitness=creator.FitnessMin)

#目的関数の定義。必ずreturnの後に,をつける
#レイリー減衰の応答解析とターゲットとなる一定減衰Shakeの差分2乗和を計算する
def resp(individual):
    #一定減衰Shakeのターゲットスペクトルの読み込み
    df_target_spec = pd.read_csv("shake.csv")
    #レイリー減衰応答解析のスペクトル計算
    FreeField_spec = np.array([FreeField(individual).resp()])
    object = np.sum((df_target_spec["shake"] - FreeField_spec[0]) ** 2)
    return object,

#各種関数の設定を行います
#交叉、選択、突然変異などには、DEAPのToolbox内にある関数を利用
toolbox = base.Toolbox()
#random.uniformの別名をattribute関数として設定。各個体の遺伝子の中身を決める関数(各遺伝子は0.02Hz~50Hzのランダムな値)
toolbox.register("attribute", random.uniform, 0.02,50)
#individualという関数を設定。それぞれの個体に含まれる2個の遺伝子をattributeにより決めるよ、ということ。
toolbox.register("individual", tools.initRepeat, creator.Individual, toolbox.attribute, 2)
#集団の個体数を設定するための関数を準備
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
#トーナメント方式で次世代に子を残す親を選択(tornsizeは各トーナメントに参加する個体の数)
toolbox.register("select", tools.selTournament, tournsize=5)
#交叉関数の設定。ブレンド交叉という手法を採用
toolbox.register("mate", tools.cxBlend,alpha=0.2)
#突然変異関数の設定。indpbは各遺伝子が突然変異を起こす確率。muとsigmaは変異の平均と標準偏差
toolbox.register("mutate", tools.mutGaussian, mu=[0.0, 0.0], sigma=[20.0, 20.0], indpb=0.2)
#評価したい関数の設定(目的関数のこと)
toolbox.register("evaluate", resp)

#以下でパラメータの設定
#今回は最も単純な遺伝的アルゴリズムの手法を採用

#乱数を固定
random.seed(64)
#何世代まで行うか
NGEN = 30
#集団の個体数
POP = 10
#交叉確率
CXPB = 0.9
#個体が突然変異を起こす確率
MUTPB = 0.1

#集団は80個体という情報の設定
pop = toolbox.population(n=POP)
#集団内の個体それぞれの適応度(目的関数の値)を計算
for individual in pop:
    individual.fitness.values = toolbox.evaluate(individual)
#パレート曲線上の個体(つまり、良い結果の個体)をhofという変数に格納
hof = tools.ParetoFront()

#今回は最も単純なSimple GAという進化戦略を採用
algorithms.eaSimple(pop, toolbox, cxpb=CXPB, mutpb=MUTPB, ngen=NGEN, halloffame=hof)

#最終的な集団(pop)からベストな個体を1体選出する関数
best_ind = tools.selBest(pop, 1)[0]
#結果表示
print("最も良い個体は %sで、そのときの目的関数の値は %s" % (best_ind, best_ind.fitness.values))

以下、関連リンク
DEAPの本家サイト

遺伝的アルゴリズムによるレイリー減衰のパラメータ探査

遺伝的アルゴリズム(genetic algorithm、略称:GA)は、古くからある近似解探査法の一つで、パラメータを生物の遺伝子に見立て、良い遺伝子を選択して交叉と突然変異を行いながら、世代交代を繰り返して最適なパラメータを探しあてる手法です。

有名なところでは、新幹線のN700系のフロントノーズの設計に使われました。

以下は一定減衰(一次元波動論)の応答スペクトルをターゲットにして、レイリー減衰(質点系)の振動数パラメータを遺伝子としてGAで最適な解を探査したものです。

図はGAで計算した各世代の最優良パラメータにおける応答スペクトルの進化をアニメgifで表しています。

各世代の応答スペクトルの進化

モデルの概要は以下です。

解析モデル概要

このアプローチでは遺伝子が二つで、あっという間に優良解が見つかってしまいます。

しかし、各部別レイリー減衰で物性ごとにレイリー減衰を設定する場合は遺伝子の数を増やすことになるので、こういった探査法が有益だと思います。

また、水平動だけでなく、上下動も同時に評価して最適解を求めたいときは、優良遺伝子の判定を工夫することで同様に探査が可能です。

ところで、質点系レイリー減衰のモデルは、外部減衰の接続先が半無限性を模擬するダンパー端部の固定点です。この固定点と、質点系モデルとの相対速度は工学的には無意味なので、減衰が適切に評価されませんが、このアプローチは、そうしたモデルの矛盾も吸収してレイリー減衰のパラメータを決定していることになります。

以下、関連リンク

# coding:utf-8

import numpy as np
from numpy import random as rand
import pandas as pd
from FreeField import FreeField
import matplotlib.pyplot as plt

from PIL import Image
import glob

class GeneticAlgorithm(object):
    """遺伝的アルゴリズム"""

    def __init__(self):
        """初期化"""

        """定数"""
        # 染色体数
        self.N = 2
        # 個体数
        self.M = 10
        # エリート選択個数
        self.n_elite = 3
        self.p_mutation = 0.10

        """変数"""
        # 第g世代評価降順インデックス
        self.val_sorted_index = []
        # 選択された遺伝子のインデックス
        self.selected_index = []
        # 次世代遺伝子
        self.next_generation = []
        # 各世代の最良応答スペクトル
        self.best_spec = []

        """初期値"""
        # 第g世代遺伝子群
        self.X_g = np.array([[rand.uniform(0.2,50.0),       # f1
                             rand.uniform(0.2,50.0)]        # f2
                             for i in range(self.M)])

        """ターゲットスペクトル"""
        self.df_target_spec = pd.read_csv("shake.csv")


    def assess(self):
        """評価"""
        # ターゲットスペクトルとの差分2乗和を評価値とする
        FreeField_spec = np.array([FreeField(x).resp() for x in self.X_g])
        R = []
        for spec in FreeField_spec:
            R.append(np.sum((self.df_target_spec["shake"] - spec) ** 2))
        R = np.array(R)
        self.val_sorted_index = np.argsort(R)[::1]
        self.best_spec.append(FreeField_spec[self.val_sorted_index[0]])


    def select(self):
        """選択"""
        # エリート選択で N 個
        elite_index = self.val_sorted_index[0:self.n_elite]
        self.selected_index = np.r_[elite_index]
        print("selected_index",self.selected_index)


    def crossover1p(self):
        """交叉"""
        ng = []
        for i in range(self.M - self.n_elite):
            p_index = rand.choice(self.selected_index, 2, replace=False)
            new_g = np.r_[self.X_g[p_index[0]][0], self.X_g[p_index[1]][1]]
            ng.append(new_g)

        ng = np.array(ng)
        self.next_generation = np.r_[ng, self.X_g[self.selected_index[0:self.n_elite]]]


    def mutation(self):
        """突然変異"""
        # 確率的に選ばれた染色体をランダム値に変更
        # 突然変異を起こす染色体の数
        n_mutation = rand.binomial(n=self.M * self.N, p=self.p_mutation)

        # 突然変異
        for i in range(n_mutation):
            m = rand.randint(self.M)
            n = rand.randint(self.N)
            self.next_generation[m][n] = rand.uniform(0.2,50.0)


    def alternation(self):
        """世代交代"""
        self.X_g = self.next_generation


if __name__ == '__main__':

    """設定"""
    # 計算世代
    G = 50

    """計算"""
    ga = GeneticAlgorithm()
    for g in range(G):
        # 評価
        ga.assess()
        # 選択
        ga.select()
        # 交叉
        ga.crossover1p()
        # 突然変異
        ga.mutation()
        # 世代交代
        ga.alternation()

    for i, spec in enumerate(ga.best_spec):
        # グラフ描画
        plt.plot(ga.df_target_spec["t"], ga.df_target_spec["shake"], label="Target")
        plt.plot(ga.df_target_spec["t"], spec, label="GA")
        plt.title("GA = "+str(i))
        plt.xlabel("Period(sec)")
        plt.ylabel("Acceleration Response Spectrum $(m/s^{2})$")
        plt.xscale("log")
        plt.ylim(0.0, 20.0)
        plt.legend()
        plt.grid()
        png = "spec_{:0>2d}.png".format(i)
        plt.savefig(png)
        plt.clf()

    # GIF 作成
    files = sorted(glob.glob('*.png'))  
    images = list(map(lambda file : Image.open(file) , files))
    images[0].save('GA.gif' , save_all = True , append_images = images[1:] , duration = 1000)
# coding:utf-8
import numpy as np
from numpy.lib.function_base import append
import pandas as pd
import math

class FreeField(object):

    def __init__(self, x):
        self.f1 = x[0]
        self.f2 = x[1]


    # 時刻歴応答解析
    def resp(self):
        f1 = self.f1
        f2 = self.f2

        model = pd.read_csv("model.csv")
        h1 = model["h"][0]
        h2 = model["h"][0]

        size = len(model["M"])
        # 質量マトリクス
        m = np.diag(list(map(lambda x: x / 9.80665, model["M"])))
        # 剛性マトリクス
        k = np.zeros((size, size))
        kdata = size -1
        for i in range(kdata):
            i1 = i
            i2 = i + 1
            k[i1][i1] = k[i1][i1] + model["K"][i]
            k[i2][i2] = k[i2][i2] + model["K"][i]
            k[i1][i2] = k[i1][i2] - model["K"][i]
            k[i2][i1] = k[i2][i1] - model["K"][i]
        # 減衰マトリクス(レイリー減衰)
        w1 = 2.0 * np.pi * f1
        w2 = 2.0 * np.pi * f2
        alpha = (2.0 * w1 * w2 * (h2 * w1 - h1 * w2)) / (w1 ** 2 - w2 ** 2)
        beta = (2.0 * (h1 * w1 - h2 * w2)) / (w1 ** 2 - w2 ** 2)
        c = np.array(alpha * m + beta * k)

        # 底面ダンパー(2E入力用)
        c[kdata][kdata] = c[kdata][kdata] + model["C"][0]

        # 入力波読み込み
        df = pd.read_csv(model["accfile"][0])
        acc0 = df["acc"]
        dt = 0.01
        beta = 0.25
        unit_vector = np.ones(size)
        pre_acc0 = 0.0
        time = 0.0
        dis = np.zeros(size)
        vel = np.zeros(size)
        acc = np.zeros(size)
        ddis = np.zeros(size)
        dvel = np.zeros(size)
        dacc = np.zeros(size)
        acc_history = {}
        for i in range(0, size):
            acc_history[i] = []
        time_history = []
        # Newmarkβ法による数値解析(増分変位による表現)
        for i in range(0, len(acc0)):
            kbar = k + (1.0/(2.0*beta*dt)) * c + (1.0/(beta*dt**2.0)) * m
            dp1 = -1.0 * m.dot(unit_vector) * (acc0[i] - pre_acc0)
            dp2 = m.dot((1.0/(beta*dt))*vel + (1.0/(2.0*beta))*acc)
            dp3 = c.dot((1.0/(2.0*beta))*vel + (1.0/(4.0*beta)-1.0)*acc*dt)
            dp = dp1 + dp2 + dp3
            ddis = np.linalg.inv(kbar).dot(dp)
            dvel = (1.0/(2.0*beta*dt))*ddis - (1.0/(2.0*beta))*vel - ((1.0/(4.0*beta)-1.0))*acc*dt
            dacc = (1.0/(beta*dt**2.0))*ddis - (1.0/(beta*dt))*vel - (1.0/(2.0*beta))*acc
            dis += ddis
            vel += dvel
            acc += dacc
            acc_abs = acc + [acc0[i] for n in range(0,size)]
            [acc_history[i].append(x) for i, x in enumerate(acc_abs)]
            time_history.append(time)
            time += dt
            pre_acc0 = acc0[i]

        # 評価対象として最上部の応答スペクトルを計算する
        spec = self.duhm_spec(acc_history[0])
        return(spec)


    def duhm_spec(self, acc):
        spec = np.empty(0)
        dlta = (math.log(5.0) - math.log(0.02)) / float(300 - 1)
        for i in range(1, 300 + 1):
            tmp = math.log(0.02) + dlta * float(i - 1)
            period = math.exp(tmp)
            zmax = self.duhm(0.01, 0.05, period, acc)
            spec = np.append(spec, zmax)
        return(spec)


    def duhm(self, dt, h, T, a):
        """duhamel integral"""
        w = 2.0 * math.pi / T
        w2 = w * w
        hw = h * w
        wd = w * math.sqrt( 1.0 - h * h )
        wdt = wd * dt
        e = math.exp( -hw * dt )
        cwdt = math.cos( wdt )
        swdt = math.sin( wdt )
        e11 = e * ( cwdt - hw * swdt / wd )
        e12 = -e * ( wd * wd + hw * hw ) * swdt / wd
        e21 = e * swdt / wd
        e22 = e * ( cwdt + hw * swdt / wd )
        ss = - hw * swdt - wd * cwdt
        cc = - hw * cwdt + wd * swdt
        s1 = ( e * ss + wd ) / w2
        c1 = ( e * cc + hw ) / w2
        s2 = ( e * dt * ss + hw * s1 + wd * c1 ) / w2
        c2 = ( e * dt * cc + hw * c1 - wd * s1 ) /w2
        s3 = dt * s1 - s2
        c3 = dt * c1 - c2
        g11 = ( -hw * s3 + wd * c3 ) / wdt
        g12 = ( -hw * s2 + wd * c2 ) / wdt
        g21 = s3 / wdt
        g22 = s2 / wdt
        dx = 0.0
        x = 0.0
        accmax = 0.0
        for m in range(1, len(a)):
            dxf = dx
            xf = x
            ddym = a[ m ]
            ddyf = a[ m - 1 ]
            dx = e11 * dxf + e12 * xf + g11 * ddym + g12 * ddyf
            x = e21 * dxf + e22 * xf + g21 * ddym + g22 * ddyf
            ddx = 2.0 * hw * dx + w2 * x
            if abs(ddx) > abs(accmax):
                accmax = abs(ddx)

        return(accmax)

使用したデータも含め、プログラムはこちらにも置いています。