N6邻域算子
def N6(p_chrom, m_chrom, f_chrom, fitness, num_job, job_operation_matrix, num_operation, time, num_machine, num_factory):
s1 = p_chrom
s2 = np.zeros(num_operation, dtype=int)
p = np.zeros(num_job, dtype=int)
for i in range(num_operation):
p[s1[i]] = p[s1[i]] + 1
s2[i] = p[s1[i]]
P0 = []
P = []
IP = []
FJ = []
for f in range(num_factory):
P.append([])
IP.append([])
FJ.append([])
for i in range(num_operation):
t1 = s1[i]
t2 = s2[i]
t3 = f_chrom[t1]
P[t3].append(p_chrom[i])
IP[t3].append(i)
for i in range(num_job):
t3 = f_chrom[i]
FJ[t3].append(i)
cf = int(fitness[2])
CP, CB, block = FindCriticalPathDHFJSP(P[cf], m_chrom, FJ[cf], cf, num_job, job_operation_matrix, time, num_machine)
for i in range(block):
BL=len(CB[i])
if BL>1:
if i==0:
Index1=int(np.floor(random.random()*(BL-1)))
Index2=BL-1
Index1=CB[i][Index1];Index2=CB[i][Index2]
tmp=P[cf][Index1]
for j in range(Index1,Index2):
P[cf][j]=P[cf][j+1]
P[cf][Index2]=tmp
if i==block-1:
Index1=0
Index2=int(np.floor(random.random()*(BL-1))+1)
Index1 = CB[i][Index1];Index2 = CB[i][Index2]
tmp = P[cf][Index2]
for j in range(Index2, Index1,-1):
P[cf][j] = P[cf][j-1]
P[cf][Index1] = tmp
if i>0 and i<block-1 and BL>2:
Index1 = int(np.floor(random.random() * (BL - 2)) + 1)
Index2=BL-1
Index1 = CB[i][Index1];Index2 = CB[i][Index2]
tmp = P[cf][Index1]
for j in range(Index1, Index2):
P[cf][j] = P[cf][j + 1]
P[cf][Index2] = tmp
Index1 = 0
Index2 = int(np.floor(random.random() * (BL - 2)) + 1)
Index1 = CB[i][Index1];Index2 = CB[i][Index2]
tmp = P[cf][Index2]
for j in range(Index2, Index1, -1):
P[cf][j] = P[cf][j - 1]
P[cf][Index1] = tmp
newm=m_chrom
newf=f_chrom
newp=np.zeros(num_operation,dtype=int)
for f in range(num_factory):
L=len(IP[f])
for i in range(L):
newp[IP[f][i]]=P[f][i]
return newp,newm,newf
DQN网络
class DQN(object):
def __init__(self, inDim, outDim, BATCH_SIZE, LR, EPSILON, GAMMA, MEMORY_CAPACITY, TARGET_REPLACE_ITER):
self.eval_net, self.target_net = Net(inDim, outDim), Net(inDim, outDim)
self.N_STATES = inDim
self.N_ACTIONS = outDim
self.learn_step_counter = 0 # for target updating
self.memory_counter = 0 # for storing memory
self.BATCH_SIZE = BATCH_SIZE
self.LR = LR
self.EPSILON = EPSILON
self.GAMMA = GAMMA
self.MEMORY_CAPACITY = MEMORY_CAPACITY
self.TARGET_REPLACE_ITER = TARGET_REPLACE_ITER
self.optimizer = torch.optim.Adam(self.eval_net.parameters(), lr=LR)
# self.optimizer = torch.optim.SGD(self.eval_net.parameters(), lr=LR)
# memory是一个np数组,每一行代表一个记录,状态 动作 奖励 新的状态
self.memory = np.zeros((MEMORY_CAPACITY, self.N_STATES * 2 + 2)) # initialize memory
self.loss_func = nn.MSELoss()
self.eval_net, self.target_net = self.eval_net.cuda(), self.target_net.cuda()
self.loss_func = self.loss_func.cuda()
def choose_action(self, x):
x = torch.unsqueeze(torch.FloatTensor(x), 0).cuda()
# input only one sample
if np.random.uniform() < self.EPSILON: # greedy
actions_value = self.eval_net.forward(x) # shape=(1, action)
actions_value = actions_value.cuda()
actions_value = actions_value.cpu()
actions_value = actions_value.detach().numpy()
actions_value[actions_value <= 0] = 0.001 # 不能有负概率
actions_value = actions_value / np.sum(actions_value) # 归一化
action = max(actions_value)
actions_value_ = actions_value[0]
index = 0
max_v = actions_value_[0]
for i in range(1,self.N_ACTIONS):
if max_v < actions_value_[i]:
index = i
max_v = actions_value_[i]
max_action = np.array([index])
for i in range(self.N_ACTIONS):
if max_v == actions_value_[i] and index != i:
max_action = np.hstack((max_action, i))
ml = len(max_action)
if ml > 1:
bction = random.randint(0, ml-1)
action = max_action[bction]
else:
action = max_action[0]
else: # random
action = np.random.randint(0, self.N_ACTIONS)
return action
def learn(self):
# target parameter update
if self.learn_step_counter % self.TARGET_REPLACE_ITER == 0:
self.target_net.load_state_dict(self.eval_net.state_dict())
self.learn_step_counter += 1
# sample batch transitions
sample_index = np.random.choice(self.MEMORY_CAPACITY, self.BATCH_SIZE)
b_memory = self.memory[sample_index, :]
b_current_state = torch.FloatTensor(b_memory[:, :self.N_STATES])
b_action = torch.LongTensor(b_memory[:, self.N_STATES:self.N_STATES + 1].astype(int))
b_reward = torch.FloatTensor(b_memory[:, self.N_STATES + 1 : self.N_STATES + 2])
b_next_state = torch.FloatTensor(b_memory[:, - self.N_STATES:])
b_current_state = b_current_state.cuda() # current state
b_action = b_action.cuda() # current action
b_reward = b_reward.cuda() # current reward
b_next_state = b_next_state.cuda() # next state
# q_eval w.r.t the action in experience
q_eval = self.eval_net(b_current_state).gather(1, b_action) # shape (batch, 1)
q_next = self.target_net(b_next_state).detach() # detach from graph, don't backpropagate
q_target = b_reward + self.GAMMA * q_next.max(1)[0].view(self.BATCH_SIZE, 1) # shape (batch, 1)
loss = self.loss_func(q_eval, q_target)
losses = loss.cpu()
losses = losses.detach().numpy()
print('train loss MSE =', losses)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
return losses