pomdp_belief_tracking.pf package¶
Submodules¶
pomdp_belief_tracking.pf.particle_filter module¶
Implementation of particle filters (PF [particle-filtering])
- particle-filtering
Djuric, P. M., Kotecha, J. H., Zhang, J., Huang, Y., Ghirmai, T., Bugallo, M. F., & Miguez, J. (2003). Particle filtering. IEEE signal processing magazine, 20(5), 19-38.
PF approximate distributions with weighted particles. A
Particle is a
State, and the weight corresponds to its
relative probability.
-
class
pomdp_belief_tracking.pf.particle_filter.Particle(state: State, weight: float)[source] A (weighted) particle contains a state and weight
Create new instance of Particle(state, weight)
-
property
state The ‘particle’ or value
-
property
weight The (relative) probability of the particle
-
property
The particle filter is a
StateDistribution, but provides the
additional API. Mainly some convenient constructors, and the ability to use
them as containers.
-
class
pomdp_belief_tracking.pf.particle_filter.ParticleFilter(states)[source] A distribution from weighted particles
Creates a particle filter from provided states
Provides uniform weights to
states- Parameters
states (
Sequence[State]) – the particles
-
__call__()[source] Implements
pomdp_belief_tracking.types.StateDistributionprotocol: sample statesSimply samples a state according to the distribution specified by the particles.
Returns the actual state, so beware of changing it, as it will affect the distribution.
- Return type
- Returns
a sample state
-
__contains__(item)[source] Checks whether
itemis a state in our particlesAllows for s in particle_filter syntax
- Parameters
item – the item to check for
-
__iter__()[source] Returns iterator over the particles
Allows for for state, weight in particle_filter:
-
__len__()[source] Returns the number of particles
Allows for len(particle_filter) syntax
-
effective_sample_size()[source] Returns the “effective sample size” of the particle filter
Calls
effective_sample_size()- Return type
float- Returns
effective sample size of self
-
static
from_distribution(distr, n)[source] Constructs a particle filter of
nparticles fromdistrBasically samples
nparticles.NOTE:
Does not _copy_ samples drawn from ``distr``. Make sure `distr()` copies the particles if necessary
- Parameters
distr (
StateDistribution) – the distribution to approximaten (
int) – the number of particles to approximate with
- Return type
- Returns
a particle filter approximating
distrwithnparticles
-
static
from_particles(particles)[source] Creates a particle filter from
particlesNormalizes the weights in
particles- Note:
Does not copy any of the particles, and particles may change in the future through belief updates
- Parameters
particles (
Iterable[Particle]) – the particles to create the filter from- Return type
- Returns
a particle filter from given
particles
-
probability_of(s, equality_function=<built-in function eq>)[source] Returns the probability of
swith equalityequality_function
-
class
pomdp_belief_tracking.pf.particle_filter.Particle(state: State, weight: float)[source]¶ Bases:
tupleA (weighted) particle contains a state and weight
Create new instance of Particle(state, weight)
-
property
state¶ The ‘particle’ or value
-
property
weight¶ The (relative) probability of the particle
-
property
-
class
pomdp_belief_tracking.pf.particle_filter.ParticleFilter(states)[source]¶ Bases:
pomdp_belief_tracking.types.StateDistributionA distribution from weighted particles
Creates a particle filter from provided states
Provides uniform weights to
states- Parameters
states (
Sequence[State]) – the particles
-
effective_sample_size()[source]¶ Returns the “effective sample size” of the particle filter
Calls
effective_sample_size()- Return type
float- Returns
effective sample size of self
-
static
from_distribution(distr, n)[source]¶ Constructs a particle filter of
nparticles fromdistrBasically samples
nparticles.NOTE:
Does not _copy_ samples drawn from ``distr``. Make sure `distr()` copies the particles if necessary
- Parameters
distr (
StateDistribution) – the distribution to approximaten (
int) – the number of particles to approximate with
- Return type
- Returns
a particle filter approximating
distrwithnparticles
-
static
from_particles(particles)[source]¶ Creates a particle filter from
particlesNormalizes the weights in
particles- Note:
Does not copy any of the particles, and particles may change in the future through belief updates
- Parameters
particles (
Iterable[Particle]) – the particles to create the filter from- Return type
- Returns
a particle filter from given
particles
-
pomdp_belief_tracking.pf.particle_filter.apply(f, pf)[source]¶ Returns a new
ParticleFilterwithfapplied on states inpfNote: assumes
fdoes _not_ affect the inputState, otherwise the inputpfwill be affected to- Parameters
f (
Callable[[State],State]) – the function to apply to particlespf (
ParticleFilter) – input starting particle filter / belief
- Return type
- Returns
the result of applying
fontopf
-
pomdp_belief_tracking.pf.particle_filter.effective_sample_size(weights, total_weight=None)[source]¶ Computes the “effective sample size” of the given weights
This value represents how “healthy” the underlying samples are. The lower this value, the fewer “real samples” are represented. As in, the closer this comes to zero, the more degenerated the samples are.
See https://en.wikipedia.org/wiki/Effective_sample_size
- Parameters
weights (
Iterable[float]) – the weights of the samplestotal_weight (
Optional[float]) – total weight of all samples, requires extra computation if not given
- Return type
float- Returns
the effective sample size of
weights
pomdp_belief_tracking.pf.rejection_sampling module¶
pomdp_belief_tracking.pf.importance_sampling module¶
Provides a general implementation of importance sampling:
-
pomdp_belief_tracking.pf.importance_sampling.general_importance_sample(proposal_distr, weight_func, particles)[source] The particle filter implementation of IS
The underlying algorithm for importance sampling uses a
ProposalDistributionand weighting distribution to update a particle filter:for weight, sample in particles: sample ~ proposal_distr(sample) weight <- weight * weight_func(sample)
Returns how long the update took in
infowith key “belief_update_runtime”- Parameters
proposal_distr (
ProposalDistribution) – function to propose sample updatesweight_func (
WeightFunction) – function that weights propsalsparticles (
Iterable[Particle]) – the starting set of particles
- Return type
Tuple[ParticleFilter,Dict[str,Any]]- Returns
a new particle set
Our belief update version of importance sampling calls this function with the appropriate parameters:
-
pomdp_belief_tracking.pf.importance_sampling.importance_sample(transition_func, observation_model, n, initial_state_distribution, a, o)[source] Applies
general_importance_sample()on POMDPsHere the
transition_funcis used to propose next states, which are weighted according to theweight_funcgiveno. Ifinitial_state_distributionis _not_ a particle filter (with a given size), then we samplenparticles with weight 1 to start IS. Otherwise we use the particles in the PF.nis necessary wheninitial_state_distributionis not aParticleFilter. Otherwise ignored.- Parameters
transition_func (
TransitionFunction) – the proposal functionobservation_model (
Callable[[State,Action,State,Observation],float]) – the model to weight the probability of generating observationon (
Optional[int]) – num samples, optionalinitial_state_distribution (
StateDistribution) – the starting distributiona (
Action) – taken actiono (
Observation) – taken observation
- Return type
Tuple[ParticleFilter,Dict[str,Any]]- Returns
updated belief
Which can best be created through our construction function (with sane defaults, otherwise you can also apply partial):
-
pomdp_belief_tracking.pf.importance_sampling.create_importance_sampling(transition_func, observation_model, n)[source] Partial function that returns a regular IS belief update
A simple wrapper around
importance_sample()Here the
transition_funcis used to propose next states, which are weighted according to theweight_func. If the belief update is _not_ a particle filter (with a given size), then we samplenparticles with weight 1 to start IS. Otherwise we use the particles in the PF.nis necessary wheninitial_state_distributionis not aParticleFilter. Otherwise ignored.- Parameters
transition_func (
TransitionFunction) – how to update statesobservation_model (
Callable[[State,Action,State,Observation],float]) – how to weight transitionsn (
Optional[int]) – num samples, optional
- Return type
- Returns
func:importance_sample as
pomdp_belief_tracking.types.BeliefUpdate
Sequential importance sampling, the application of the belief update over
multiple time steps, often involes resample() to avoid particle
degeneration. When to resample is not straightforward; we provide a general
condition protocol
-
class
pomdp_belief_tracking.pf.importance_sampling.ResampleCondition(*args, **kwds)[source] The signature of a resample condition
-
__call__(pf)[source]¶ Inspects
pfand decides whether it is time to re-sample- Parameters
pf (
ParticleFilter) – the particle filter to potentially resample- Return type
bool- Returns
Trueifpfshould be resampled
Provided implementations:
Returns whether the sample size of
pfis lower thanminimal_size-
Lastly, we provide a factory function that combines importance sampling with resampling
-
pomdp_belief_tracking.pf.importance_sampling.create_sequential_importance_sampling(resample_condition, transition_func, observation_model, n=None)[source] Main entry point of this module to create importance sampling update
A simple wrapper combining
resample()(ifresample_conditionis met) withimportance_sample()(created by callingcreate_importance_sampling()nis necessary wheninitial_state_distributionis not aParticleFilter. Otherwise ignored.- Parameters
resample_condition (
ResampleCondition) – when to resample (called before IS)transition_func (
TransitionFunction) – the transition function to propose particlesobservation_model (
Callable[[State,Action,State,Observation],float]) – the function to weight the new particlesn (
Optional[int]) – number of desired particles
- Return type
-
class
pomdp_belief_tracking.pf.importance_sampling.ResampleCondition(*args, **kwds)[source]¶ Bases:
typing_extensions.ProtocolThe signature of a resample condition
-
__call__(pf)[source]¶ Inspects
pfand decides whether it is time to re-sample- Parameters
pf (
ParticleFilter) – the particle filter to potentially resample- Return type
bool- Returns
Trueifpfshould be resampled
Provided implementations:
Returns whether the sample size of
pfis lower thanminimal_size-
-
class
pomdp_belief_tracking.pf.importance_sampling.WeightFunction(*args, **kwds)[source]¶ Bases:
typing_extensions.ProtocolSignature of a weighting function in
general_importance_sample()-
__call__(proposal, sample_ctx, info)[source]¶ Weights a
state->proposaltransition undersample_ctx- Parameters
proposal (
State) – proposed (updated) samplesample_ctx (
Any) – context around proposalinfo (
Dict[str,Any]) – global information stored during importance sampling
- Return type
float- Returns
a 0 <= weight <= 1
-
-
pomdp_belief_tracking.pf.importance_sampling.create_importance_sampling(transition_func, observation_model, n)[source]¶ Partial function that returns a regular IS belief update
A simple wrapper around
importance_sample()Here the
transition_funcis used to propose next states, which are weighted according to theweight_func. If the belief update is _not_ a particle filter (with a given size), then we samplenparticles with weight 1 to start IS. Otherwise we use the particles in the PF.nis necessary wheninitial_state_distributionis not aParticleFilter. Otherwise ignored.- Parameters
transition_func (
TransitionFunction) – how to update statesobservation_model (
Callable[[State,Action,State,Observation],float]) – how to weight transitionsn (
Optional[int]) – num samples, optional
- Return type
- Returns
func:importance_sample as
pomdp_belief_tracking.types.BeliefUpdate
-
pomdp_belief_tracking.pf.importance_sampling.create_sequential_importance_sampling(resample_condition, transition_func, observation_model, n=None)[source]¶ Main entry point of this module to create importance sampling update
A simple wrapper combining
resample()(ifresample_conditionis met) withimportance_sample()(created by callingcreate_importance_sampling()nis necessary wheninitial_state_distributionis not aParticleFilter. Otherwise ignored.- Parameters
resample_condition (
ResampleCondition) – when to resample (called before IS)transition_func (
TransitionFunction) – the transition function to propose particlesobservation_model (
Callable[[State,Action,State,Observation],float]) – the function to weight the new particlesn (
Optional[int]) – number of desired particles
- Return type
-
pomdp_belief_tracking.pf.importance_sampling.general_importance_sample(proposal_distr, weight_func, particles)[source]¶ The particle filter implementation of IS
The underlying algorithm for importance sampling uses a
ProposalDistributionand weighting distribution to update a particle filter:for weight, sample in particles: sample ~ proposal_distr(sample) weight <- weight * weight_func(sample)
Returns how long the update took in
infowith key “belief_update_runtime”- Parameters
proposal_distr (
ProposalDistribution) – function to propose sample updatesweight_func (
WeightFunction) – function that weights propsalsparticles (
Iterable[Particle]) – the starting set of particles
- Return type
Tuple[ParticleFilter,Dict[str,Any]]- Returns
a new particle set
-
pomdp_belief_tracking.pf.importance_sampling.importance_sample(transition_func, observation_model, n, initial_state_distribution, a, o)[source]¶ Applies
general_importance_sample()on POMDPsHere the
transition_funcis used to propose next states, which are weighted according to theweight_funcgiveno. Ifinitial_state_distributionis _not_ a particle filter (with a given size), then we samplenparticles with weight 1 to start IS. Otherwise we use the particles in the PF.nis necessary wheninitial_state_distributionis not aParticleFilter. Otherwise ignored.- Parameters
transition_func (
TransitionFunction) – the proposal functionobservation_model (
Callable[[State,Action,State,Observation],float]) – the model to weight the probability of generating observationon (
Optional[int]) – num samples, optionalinitial_state_distribution (
StateDistribution) – the starting distributiona (
Action) – taken actiono (
Observation) – taken observation
- Return type
Tuple[ParticleFilter,Dict[str,Any]]- Returns
updated belief
-
pomdp_belief_tracking.pf.importance_sampling.ineffective_sample_size(minimal_size, pf)[source]¶ Returns whether the sample size of
pfis lower thanminimal_sizeWhen given
minimal_sizethis implementsResampleConditionprotocol. Asserts thatminimal_size> 0Calls
effective_sample_size()under the hood- Parameters
minimal_size (
float) – the required sample size for this to return False (> 0)pf (
ParticleFilter) – the particle filter to test the sample size of
- Returns
True if
minimal_size> sample size ofpf
-
pomdp_belief_tracking.pf.importance_sampling.resample(pf, n)[source]¶ Samples
nparticles fromdistr- Parameters
pf (
ParticleFilter) – incoming particle filtern (
int) – number of desired samples in returned PF
- Return type
- Returns
the resulting particle filter of resampling
pf
pomdp_belief_tracking.pf.types module¶
Some additional (shared) particle filtering types
Module contents¶
Particle Filters and their update functions
Particle filters are approximations of distributions. They represent them
through Particle, a
State with a relative weight.
In this module we implement such a
ParticleFilter and some
functions to update them:
rejection_sample()