Python реализация алгоритма Витерби
Я делаю проект на Python, в котором я хотел бы использовать алгоритм Витерби. Кто-нибудь знает о полной реализации алгоритма Витерби на Python? Корректность одного из них в Википедии, похоже, находится под вопросом на странице обсуждения. У кого-нибудь есть указатель?
7 ответов:
Хм, я могу опубликовать свой. Это не очень красиво, хотя, пожалуйста, дайте мне знать, если вам нужно разъяснение. Я написал это относительно недавно специально для части речи пометки.
class Trellis: trell = [] def __init__(self, hmm, words): self.trell = [] temp = {} for label in hmm.labels: temp[label] = [0,None] for word in words: self.trell.append([word,copy.deepcopy(temp)]) self.fill_in(hmm) def fill_in(self,hmm): for i in range(len(self.trell)): for token in self.trell[i][1]: word = self.trell[i][0] if i == 0: self.trell[i][1][token][0] = hmm.e(token,word) else: max = None guess = None c = None for k in self.trell[i-1][1]: c = self.trell[i-1][1][k][0] + hmm.t(k,token) if max == None or c > max: max = c guess = k max += hmm.e(token,word) self.trell[i][1][token][0] = max self.trell[i][1][token][1] = guess def return_max(self): tokens = [] token = None for i in range(len(self.trell)-1,-1,-1): if token == None: max = None guess = None for k in self.trell[i][1]: if max == None or self.trell[i][1][k][0] > max: max = self.trell[i][1][k][0] token = self.trell[i][1][k][1] guess = k tokens.append(guess) else: tokens.append(token) token = self.trell[i][1][token][1] tokens.reverse() return tokens
Я нашел следующий код в Примере репозитория искусственного интеллекта: современный подход. Что-то вроде этого ты ищешь?
def viterbi_segment(text, P): """Find the best segmentation of the string of characters, given the UnigramTextModel P.""" # best[i] = best probability for text[0:i] # words[i] = best word ending at position i n = len(text) words = [''] + list(text) best = [1.0] + [0.0] * n ## Fill in the vectors best, words via dynamic programming for i in range(n+1): for j in range(0, i): w = text[j:i] if P[w] * best[i - len(w)] >= best[i]: best[i] = P[w] * best[i - len(w)] words[i] = w ## Now recover the sequence of best words sequence = []; i = len(words)-1 while i > 0: sequence[0:0] = [words[i]] i = i - len(words[i]) ## Return sequence of best words and overall probability return sequence, best[-1]
Я только что исправил псевдо-реализацию Витерби в Википедии. Из первоначальной (неправильной) версии мне потребовалось некоторое время, чтобы понять, где я ошибался, но в конце концов мне это удалось, отчасти благодаря реализации Кевином Мерфи
viterbi_path.m
в MATLAB HMM toolbox.В контексте объекта HMM с переменными, как показано:
hmm = HMM() hmm.priors = np.array([0.5, 0.5]) # pi = prior probs hmm.transition = np.array([[0.75, 0.25], # A = transition probs. / 2 states [0.32, 0.68]]) hmm.emission = np.array([[0.8, 0.1, 0.1], # B = emission (observation) probs. / 3 obs modes [0.1, 0.2, 0.7]])
Функция Python для запуска алгоритма Viterbi (best-path) приведена ниже:
def viterbi (self,observations): """Return the best path, given an HMM model and a sequence of observations""" # A - initialise stuff nSamples = len(observations[0]) nStates = self.transition.shape[0] # number of states c = np.zeros(nSamples) #scale factors (necessary to prevent underflow) viterbi = np.zeros((nStates,nSamples)) # initialise viterbi table psi = np.zeros((nStates,nSamples)) # initialise the best path table best_path = np.zeros(nSamples); # this will be your output # B- appoint initial values for viterbi and best path (bp) tables - Eq (32a-32b) viterbi[:,0] = self.priors.T * self.emission[:,observations(0)] c[0] = 1.0/np.sum(viterbi[:,0]) viterbi[:,0] = c[0] * viterbi[:,0] # apply the scaling factor psi[0] = 0; # C- Do the iterations for viterbi and psi for time>0 until T for t in range(1,nSamples): # loop through time for s in range (0,nStates): # loop through the states @(t-1) trans_p = viterbi[:,t-1] * self.transition[:,s] psi[s,t], viterbi[s,t] = max(enumerate(trans_p), key=operator.itemgetter(1)) viterbi[s,t] = viterbi[s,t]*self.emission[s,observations(t)] c[t] = 1.0/np.sum(viterbi[:,t]) # scaling factor viterbi[:,t] = c[t] * viterbi[:,t] # D - Back-tracking best_path[nSamples-1] = viterbi[:,nSamples-1].argmax() # last state for t in range(nSamples-1,0,-1): # states of (last-1)th to 0th time step best_path[t-1] = psi[best_path[t],t] return best_path
Вот мой. Его перефразировали непосредственно изpsuedocode implemenation из Википедии . Он использует
numpy
для конвейеризации ихndarray
, но в остальном является чистой реализацией python3.import numpy as np def viterbi(y, A, B, Pi=None): """ Return the MAP estimate of state trajectory of Hidden Markov Model. Parameters ---------- y : array (T,) Observation state sequence. int dtype. A : array (K, K) State transition matrix. See HiddenMarkovModel.state_transition for details. B : array (K, M) Emission matrix. See HiddenMarkovModel.emission for details. Pi: optional, (K,) Initial state probabilities: Pi[i] is the probability x[0] == i. If None, uniform initial distribution is assumed (Pi[:] == 1/K). Returns ------- x : array (T,) Maximum a posteriori probability estimate of hidden state trajectory, conditioned on observation sequence y under the model parameters A, B, Pi. T1: array (K, T) the probability of the most likely path so far T2: array (K, T) the x_j-1 of the most likely path so far """ # Cardinality of the state space K = A.shape[0] # Initialize the priors with default (uniform dist) if not given by caller Pi = Pi if Pi is not None else np.full(K, 1 / K) T = len(y) T1 = np.empty((K, T), 'd') T2 = np.empty((K, T), 'B') # Initilaize the tracking tables from first observation T1[:, 0] = Pi * B[:, y[0]] T2[:, 0] = 0 # Iterate throught the observations updating the tracking tables for i in range(1, T): T1[:, i] = np.max(T1[:, i - 1] * A.T * B[np.newaxis, :, y[i]].T, 1) T2[:, i] = np.argmax(T1[:, i - 1] * A.T, 1) # Build the output, optimal model trajectory x = np.empty(T, 'B') x[-1] = np.argmax(T1[:, T - 1]) for i in reversed(range(1, T)): x[i - 1] = T2[x[i], i] return x, T1, T2
Это старый вопрос, но ни один из других ответов не был вполне тем, что мне нужно, потому что мое приложение не имеет конкретных наблюдаемых состояний.
Взяв за основу @Rhubarb, я также повторно реализовал реализацию MatlabКевина Мерфи (см.
viterbi_path.m
), но я сохранил ее ближе к оригиналу. Я также включил простой тестовый случай.Обратите внимание, что эта реализация не использует вероятности выбросов напрямую, а использует переменнуюimport numpy as np def viterbi_path(prior, transmat, obslik, scaled=True, ret_loglik=False): '''Finds the most-probable (Viterbi) path through the HMM state trellis Notation: Z[t] := Observation at time t Q[t] := Hidden state at time t Inputs: prior: np.array(num_hid) prior[i] := Pr(Q[0] == i) transmat: np.ndarray((num_hid,num_hid)) transmat[i,j] := Pr(Q[t+1] == j | Q[t] == i) obslik: np.ndarray((num_hid,num_obs)) obslik[i,t] := Pr(Z[t] | Q[t] == i) scaled: bool whether or not to normalize the probability trellis along the way doing so prevents underflow by repeated multiplications of probabilities ret_loglik: bool whether or not to return the log-likelihood of the best path Outputs: path: np.array(num_obs) path[t] := Q[t] ''' num_hid = obslik.shape[0] # number of hidden states num_obs = obslik.shape[1] # number of observations (not observation *states*) # trellis_prob[i,t] := Pr((best sequence of length t-1 goes to state i), Z[1:(t+1)]) trellis_prob = np.zeros((num_hid,num_obs)) # trellis_state[i,t] := best predecessor state given that we ended up in state i at t trellis_state = np.zeros((num_hid,num_obs), dtype=int) # int because its elements will be used as indicies path = np.zeros(num_obs, dtype=int) # int because its elements will be used as indicies trellis_prob[:,0] = prior * obslik[:,0] # element-wise mult if scaled: scale = np.ones(num_obs) # only instantiated if necessary to save memory scale[0] = 1.0 / np.sum(trellis_prob[:,0]) trellis_prob[:,0] *= scale[0] trellis_state[:,0] = 0 # arbitrary value since t == 0 has no predecessor for t in xrange(1, num_obs): for j in xrange(num_hid): trans_probs = trellis_prob[:,t-1] * transmat[:,j] # element-wise mult trellis_state[j,t] = trans_probs.argmax() trellis_prob[j,t] = trans_probs[trellis_state[j,t]] # max of trans_probs trellis_prob[j,t] *= obslik[j,t] if scaled: scale[t] = 1.0 / np.sum(trellis_prob[:,t]) trellis_prob[:,t] *= scale[t] path[-1] = trellis_prob[:,-1].argmax() for t in range(num_obs-2, -1, -1): path[t] = trellis_state[(path[t+1]), t+1] if not ret_loglik: return path else: if scaled: loglik = -np.sum(np.log(scale)) else: p = trellis_prob[path[-1],-1] loglik = np.log(p) return path, loglik if __name__=='__main__': # Assume there are 3 observation states, 2 hidden states, and 5 observations priors = np.array([0.5, 0.5]) transmat = np.array([ [0.75, 0.25], [0.32, 0.68]]) emmat = np.array([ [0.8, 0.1, 0.1], [0.1, 0.2, 0.7]]) observations = np.array([0, 1, 2, 1, 0], dtype=int) obslik = np.array([emmat[:,z] for z in observations]).T print viterbi_path(priors, transmat, obslik) #=> [0 1 1 1 0] print viterbi_path(priors, transmat, obslik, scaled=False) #=> [0 1 1 1 0] print viterbi_path(priors, transmat, obslik, ret_loglik=True) #=> (array([0, 1, 1, 1, 0]), -7.776472586614755) print viterbi_path(priors, transmat, obslik, scaled=False, ret_loglik=True) #=> (array([0, 1, 1, 1, 0]), -8.0120386579275227)
obslik
. Обычно,emissions[i,j] := Pr(observed_state == j | hidden_state == i)
для конкретного наблюдаемого состоянияi
, составляяemissions.shape == (num_hidden_states, num_obs_states)
. Однако, учитывая последовательностьobservations[t] := observation at time t
, Все, что требуется алгоритму Витерби, - это вероятность этого наблюдения для каждого скрытого состояния. Следовательно,obslik[i,t] := Pr(observations[t] | hidden_state == i)
. Фактическое значение наблюдаемого состояния не обязательно.
Я модифицировал ответ @Rhubarb для условия, когда предельные вероятности уже известны (например, путем вычисления прямого обратного алгоритма).
def viterbi (transition_probabilities, conditional_probabilities): # Initialise everything num_samples = conditional_probabilities.shape[1] num_states = transition_probabilities.shape[0] # number of states c = np.zeros(num_samples) #scale factors (necessary to prevent underflow) viterbi = np.zeros((num_states,num_samples)) # initialise viterbi table best_path_table = np.zeros((num_states,num_samples)) # initialise the best path table best_path = np.zeros(num_samples).astype(np.int32) # this will be your output # B- appoint initial values for viterbi and best path (bp) tables - Eq (32a-32b) viterbi[:,0] = conditional_probabilities[:,0] c[0] = 1.0/np.sum(viterbi[:,0]) viterbi[:,0] = c[0] * viterbi[:,0] # apply the scaling factor # C- Do the iterations for viterbi and psi for time>0 until T for t in range(1, num_samples): # loop through time for s in range (0,num_states): # loop through the states @(t-1) trans_p = viterbi[:, t-1] * transition_probabilities[:,s] # transition probs of each state transitioning best_path_table[s,t], viterbi[s,t] = max(enumerate(trans_p), key=operator.itemgetter(1)) viterbi[s,t] = viterbi[s,t] * conditional_probabilities[s][t] c[t] = 1.0/np.sum(viterbi[:,t]) # scaling factor viterbi[:,t] = c[t] * viterbi[:,t] ## D - Back-tracking best_path[num_samples-1] = viterbi[:,num_samples-1].argmax() # last state for t in range(num_samples-1,0,-1): # states of (last-1)th to 0th time step best_path[t-1] = best_path_table[best_path[t],t] return best_path