优化算法|基于Deep-Q-Network(DQN)的邻域搜索算法求解分布式柔性作业车间调度问题-部分代码

时间:2024-11-22 07:00:36

N6邻域算子

def N6(p_chrom, m_chrom, f_chrom, fitness, num_job, job_operation_matrix, num_operation, time, num_machine, num_factory):
    s1 = p_chrom
    s2 = np.zeros(num_operation, dtype=int)
    p = np.zeros(num_job, dtype=int)
    for i in range(num_operation):
        p[s1[i]] = p[s1[i]] + 1
        s2[i] = p[s1[i]]
    P0 = []
    P = []
    IP = []
    FJ = []
    for f in range(num_factory):
        P.append([])
        IP.append([])
        FJ.append([])

    for i in range(num_operation):
        t1 = s1[i]
        t2 = s2[i]
        t3 = f_chrom[t1]
        P[t3].append(p_chrom[i])
        IP[t3].append(i)
    for i in range(num_job):
        t3 = f_chrom[i]
        FJ[t3].append(i)

    cf = int(fitness[2])
    CP, CB, block = FindCriticalPathDHFJSP(P[cf], m_chrom, FJ[cf], cf, num_job, job_operation_matrix, time, num_machine)
    for i in range(block):
        BL=len(CB[i])
        if BL>1:
            if i==0:
                Index1=int(np.floor(random.random()*(BL-1)))
                Index2=BL-1
                Index1=CB[i][Index1];Index2=CB[i][Index2]
                tmp=P[cf][Index1]
                for j in range(Index1,Index2):
                    P[cf][j]=P[cf][j+1]
                P[cf][Index2]=tmp
            if i==block-1:
                Index1=0
                Index2=int(np.floor(random.random()*(BL-1))+1)
                Index1 = CB[i][Index1];Index2 = CB[i][Index2]
                tmp = P[cf][Index2]
                for j in range(Index2, Index1,-1):
                    P[cf][j] = P[cf][j-1]
                P[cf][Index1] = tmp
            if i>0 and i<block-1 and BL>2:
                Index1 = int(np.floor(random.random() * (BL - 2)) + 1)
                Index2=BL-1
                Index1 = CB[i][Index1];Index2 = CB[i][Index2]
                tmp = P[cf][Index1]
                for j in range(Index1, Index2):
                    P[cf][j] = P[cf][j + 1]
                P[cf][Index2] = tmp
                Index1 = 0
                Index2 = int(np.floor(random.random() * (BL - 2)) + 1)
                Index1 = CB[i][Index1];Index2 = CB[i][Index2]
                tmp = P[cf][Index2]
                for j in range(Index2, Index1, -1):
                    P[cf][j] = P[cf][j - 1]
                P[cf][Index1] = tmp
    newm=m_chrom
    newf=f_chrom
    newp=np.zeros(num_operation,dtype=int)
    for f in range(num_factory):
        L=len(IP[f])
        for i in range(L):
            newp[IP[f][i]]=P[f][i]
    return newp,newm,newf

DQN网络

class DQN(object):
    def __init__(self, inDim, outDim, BATCH_SIZE, LR, EPSILON, GAMMA, MEMORY_CAPACITY, TARGET_REPLACE_ITER):
        self.eval_net, self.target_net = Net(inDim, outDim), Net(inDim, outDim)
        self.N_STATES = inDim
        self.N_ACTIONS = outDim
        self.learn_step_counter = 0                                     # for target updating
        self.memory_counter = 0                                         # for storing memory
        self.BATCH_SIZE = BATCH_SIZE
        self.LR = LR
        self.EPSILON = EPSILON
        self.GAMMA = GAMMA
        self.MEMORY_CAPACITY = MEMORY_CAPACITY
        self.TARGET_REPLACE_ITER = TARGET_REPLACE_ITER
        self.optimizer = torch.optim.Adam(self.eval_net.parameters(), lr=LR)
        # self.optimizer = torch.optim.SGD(self.eval_net.parameters(), lr=LR)
        # memory是一个np数组,每一行代表一个记录,状态 动作 奖励 新的状态
        self.memory = np.zeros((MEMORY_CAPACITY, self.N_STATES * 2 + 2))     # initialize memory

        self.loss_func = nn.MSELoss()
        self.eval_net, self.target_net = self.eval_net.cuda(), self.target_net.cuda()
        self.loss_func = self.loss_func.cuda()

    def choose_action(self, x):
        x = torch.unsqueeze(torch.FloatTensor(x), 0).cuda()

        # input only one sample
        if np.random.uniform() < self.EPSILON:   # greedy
            actions_value = self.eval_net.forward(x)  # shape=(1, action)

            actions_value = actions_value.cuda()
            actions_value = actions_value.cpu()
            actions_value = actions_value.detach().numpy()

            actions_value[actions_value <= 0] = 0.001  # 不能有负概率
            actions_value = actions_value / np.sum(actions_value)  # 归一化
            action = max(actions_value)
            actions_value_ = actions_value[0]
            index = 0
            max_v = actions_value_[0]
            for i in range(1,self.N_ACTIONS):
                if max_v < actions_value_[i]:
                    index = i
                    max_v = actions_value_[i]
            max_action = np.array([index])
            for i in range(self.N_ACTIONS):
                if max_v == actions_value_[i] and index != i:
                    max_action = np.hstack((max_action, i))
            ml = len(max_action)
            if ml > 1:
                bction = random.randint(0, ml-1)
                action = max_action[bction]
            else:
                action = max_action[0]
        else:   # random
            action = np.random.randint(0, self.N_ACTIONS)   
        return action


    def learn(self):
        # target parameter update
        if self.learn_step_counter % self.TARGET_REPLACE_ITER == 0:
            self.target_net.load_state_dict(self.eval_net.state_dict())
        self.learn_step_counter += 1
        # sample batch transitions
        sample_index = np.random.choice(self.MEMORY_CAPACITY, self.BATCH_SIZE)
        b_memory = self.memory[sample_index, :]
        b_current_state = torch.FloatTensor(b_memory[:, :self.N_STATES])
        b_action = torch.LongTensor(b_memory[:, self.N_STATES:self.N_STATES + 1].astype(int))  
        b_reward = torch.FloatTensor(b_memory[:, self.N_STATES + 1 : self.N_STATES + 2])
        b_next_state = torch.FloatTensor(b_memory[:, - self.N_STATES:])

        b_current_state = b_current_state.cuda() # current state
        b_action = b_action.cuda() # current action
        b_reward = b_reward.cuda() # current reward
        b_next_state = b_next_state.cuda() # next state

        # q_eval w.r.t the action in experience
        q_eval = self.eval_net(b_current_state).gather(1, b_action)  # shape (batch, 1)
        q_next = self.target_net(b_next_state).detach()     # detach from graph, don't backpropagate
        q_target = b_reward + self.GAMMA * q_next.max(1)[0].view(self.BATCH_SIZE, 1)   # shape (batch, 1)
        loss = self.loss_func(q_eval, q_target)
        losses = loss.cpu()
        losses = losses.detach().numpy()
        print('train loss MSE =', losses)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        return losses