from phasic import Graph, with_ipv
import sys
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.colors import LogNorm
import seaborn as sns
from vscodenb import set_vscode_theme
sns.set_palette('tab10')
set_vscode_theme()
np.random.seed(42)Discrete distributions
Consider this simple graph representing an exponential distribution. If you must, you can make it the coalescence time of two lineages in very small population:
def exponential(state):
transitions = [] # List for transitions from state
if state[0] < 2: # Control transitions allowed
child = state.copy() # Make a copy to modify
child[0] += 1 # Make the child state
trans = [child, 2.0] # Pair the child with a transition rate
transitions.append(trans) # Append transition
return transitions
graph = Graph(exponential, ipv=[1])
graph.plot()def exponential(rate):
graph = Graph(1)
vertex = graph.find_or_create_vertex([2])
graph.starting_vertex().add_edge(vertex, 1)
abs_vertex = graph.find_or_create_vertex([1])
vertex.add_edge(abs_vertex, rate)
return graph
graph = exponential(0.4)
graph.plot()rate = 0.4
x = np.arange(0, 10, 0.01)
graph = exponential(rate)
plt.plot(x, graph.cdf(x))
plt.show()Discrete phase-type distributions represent the the number of jumps in a Markov Chain before absorption. We can discretize our graph it to get a geometric distribution:
The discretize method returns a new graph, by connecting each non-absorbing vertex in the graph to an auxiliary vertex with a specified rate. Each AUX vertex connects to its parent with rate 1. Once AUX vertices are added, the graph is normalized so the sum of edge weights sum to 1 for each vertex. discretize returns a new graph with a .rewards attribute containing the indices of the added AUX vertices.
x = np.arange(10)
cdf = graph.reward_transform(graph.rewards).cdf(x)
plt.hlines(y=cdf, xmin=x, xmax=x+1, zorder=1, color='lightblue')
plt.scatter(x, cdf, s=18)
plt.show()You can treat the discrete graph by passing discrete=False (omit the argument), in which case the returned distribution is over continuous time, rather than discrete jumps:
rate = 0.4
graph = exponential(rate)
graph = graph.discretize(1-rate)
x = np.arange(0, 10, 0.01)
cdf = graph.reward_transform(graph.rewards).cdf(x)
plt.plot(x, cdf)
plt.show()To model discrete mutations with rate 0.1 along branches of the coalescent tree. We could of course just make a new state-space creation function, but we can also manipulate an existing continuous one.
E.g. if you want to get the distribution of accumulated mutations over time rather than jumps.
Let’s model discrete mutations with rate 0.1 along branches of the coalescent tree. We could of course just make a new state-space creation function, but we can also manipulate an existing continuous one.
nr_samples = 4
@with_ipv([nr_samples]+[0]*(nr_samples-1))
def coalescent(state):
transitions = []
for i in range(state.size):
for j in range(i, state.size):
same = int(i == j)
if same and state[i] < 2:
continue
if not same and (state[i] < 1 or state[j] < 1):
continue
new = state.copy()
new[i] -= 1
new[j] -= 1
new[i+j+1] += 1
transitions.append((new, state[i]*(state[j]-same)/(1+same)))
return transitions
mutation_graph = Graph(coalescent)
mutation_graph.plot()This process is automated by the discretize method, which accepts either a single rate of discrete events for all state (E.g. graph.discretize(0.7)), or a callable mapping each state vector to a rate. The following example produces the mutation graph above:
from typing import Optional
mutation_graph = Graph(coalescent)
def mutation(state:np.ndarray, mutation_rate:float):
nr_lineages = sum(state)
return nr_lineages * mutation_rate
mutation_rate = 0.1
mutation_graph = mutation_graph.discretize(rate=mutation_rate)
rewards = mutation_graph.rewardsThe auxiliary nodes added have dummy states with all zeros and are labelled as “AUX” vertices when the graph is plotted:
If you want, you can assign nodes to subgraphs either by index or by state, by passing a callable as the by_index or by_state argument. For example, we can group the auxiliary nodes we added above into a separate subgraph:
def fun(index):
if rewards[index]:
return "Auxiliary states\ncollecting a\nreward (mutations)\neach time visited"
else:
return "Standard states"
mutation_graph.plot(by_index=fun)To find compute the expected number of mutations, we apply rewards such that the mutation vertex get a reward of 1 and all other states get reward of 0. Since this is already encoded in the mutation_vertices array we made above, we can convert it to integers and pass it as rewards:
mutation_graph.expectation(rewards)0.15
We can verify that the expected number of mutations correspond to the total branch length scaled with the mutation rate:
graph = Graph(coalescent)
tot_brlen = graph.expectation(graph.states().T.sum(axis=0))
tot_brlen * mutation_rate0.3666666666666667
Or compare it to emprical mean of samples from the models
samples = mutation_graph.sample(10000, rewards=rewards)
samples.mean().item()0.142
rewards = graph.states().T
rewards = rewards[:-1].sum(axis=0)
samples = graph.sample(10000, rewards=rewards)
samples.mean().item() * mutation_rate0.3642283761262491
If we reward transform the graph, we can find the distribution function for the the total number of mutations:
rev_trans_mutation_graph = mutation_graph.reward_transform_discrete(rewards)
rev_trans_mutation_graph.plot()Notice that with this reward transformation the graph is no longer sparse, as all paths through the graph are represented.
x = np.arange(20)
pmf = rev_trans_mutation_graph.pdf(x)
cdf = rev_trans_mutation_graph.cdf(x)
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(8, 3), sharey=True)
ax1.bar(x, pmf)
ax1.set_title("PDF")
ax1.set_xlabel('Total number of mutations')
left, right = x, np.arange(1, x.size+1, 1)
ax2.hlines(y=cdf, xmin=left, xmax=right, zorder=1)
ax2.scatter(left, cdf, s=18, zorder=2)
ax2.set_title("CDF")
ax2.set_xlabel('Total number of mutations')
ax2.axhline(y=mutation_graph.defect(), linestyle='dotted') ;Multivariate phase-type distributions (marginal)
We can compute marginal expectations using rewards. Instead of a univariate reward, we can have the distribution earn a vector of rewards for every time unit spent at each vertex. We can use the columns in our state matrix as reward vectors. Singleton rewards are:
graph = Graph(coalescent)
# mutation_graph = graph.discretize(rate=1)
mutation_graph = graph.discretize(rate=0.999999999)
mutation_graph.plot()mutation_graph.rewardsarray([0, 0, 0, 0, 0, 0, 1, 1, 1, 1])
To get the marginal distributions we need to first construct the reward matrix for reward-transformations:
(FIXME: Would be nice to be able to just supply rewards as if the graph was cont)
orig_graph_rev_idx = np.arange(1, graph.vertices_length()-1)
rewards = np.concatenate([graph.states().T, graph.states().T[:,orig_graph_rev_idx]], axis=1)
rewards = rewards[:-1]
rewards = rewards * mutation_graph.rewards
rewardsarray([[0, 0, 0, 0, 0, 0, 4, 2, 0, 1],
[0, 0, 0, 0, 0, 0, 0, 1, 2, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 1]])
[mutation_graph.expectation(r) for r in rewards][1.999999998, 0.999999999, 0.666666666]
singleton_graph = mutation_graph.reward_transform(rewards[0])
singleton_graph.plot(nodesep=0.3)fig, axes = plt.subplots(2, 3, figsize=(8, 5))
x = np.arange(0, 15)
for i, r in enumerate(rewards):
# reward transform and compute pdf (pmf) and cdf:
revt_graph = mutation_graph.reward_transform(r)
pmf = revt_graph.pdf(x)
cdf = revt_graph.cdf(x)
# plot it
axes[0, i].set_title(f"{i+1}'ton PDF")
axes[0, i].bar(x, pmf)
axes[0, i].set_xlabel('Mutations')
axes[0, i].set_ylim(0, 0.2)
left, right = x, np.arange(1, x.size+1, 1)
axes[1, i].hlines(y=cdf, xmin=left, xmax=right, zorder=1)
axes[1, i].scatter(left, cdf, s=18, zorder=2)
axes[1, i].set_title(f"{i+1}'ton CDF")
axes[1, i].set_xlabel('Mutations')
axes[1, i].set_ylim(0, 1.05)
plt.tight_layout()