pomdp_belief_tracking.pf package

Submodules

pomdp_belief_tracking.pf.particle_filter module

Implementation of particle filters (PF [particle-filtering])

particle-filtering

Djuric, P. M., Kotecha, J. H., Zhang, J., Huang, Y., Ghirmai, T., Bugallo, M. F., & Miguez, J. (2003). Particle filtering. IEEE signal processing magazine, 20(5), 19-38.

PF approximate distributions with weighted particles. A Particle is a State, and the weight corresponds to its relative probability.

class pomdp_belief_tracking.pf.particle_filter.Particle(state: State, weight: float)[source]

A (weighted) particle contains a state and weight

Create new instance of Particle(state, weight)

property state

The ‘particle’ or value

property weight

The (relative) probability of the particle

The particle filter is a StateDistribution, but provides the additional API. Mainly some convenient constructors, and the ability to use them as containers.

class pomdp_belief_tracking.pf.particle_filter.ParticleFilter(states)[source]

A distribution from weighted particles

Creates a particle filter from provided states

Provides uniform weights to states

Parameters

states (Sequence[State]) – the particles

__call__()[source]

Implements pomdp_belief_tracking.types.StateDistribution protocol: sample states

Simply samples a state according to the distribution specified by the particles.

Returns the actual state, so beware of changing it, as it will affect the distribution.

Return type

State

Returns

a sample state

__contains__(item)[source]

Checks whether item is a state in our particles

Allows for s in particle_filter syntax

Parameters

item – the item to check for

__iter__()[source]

Returns iterator over the particles

Allows for for state, weight in particle_filter:

__len__()[source]

Returns the number of particles

Allows for len(particle_filter) syntax

effective_sample_size()[source]

Returns the “effective sample size” of the particle filter

Calls effective_sample_size()

Return type

float

Returns

effective sample size of self

static from_distribution(distr, n)[source]

Constructs a particle filter of n particles from distr

Basically samples n particles.

NOTE:

Does not _copy_ samples drawn from ``distr``. Make sure `distr()`
copies the particles if necessary
Parameters
  • distr (StateDistribution) – the distribution to approximate

  • n (int) – the number of particles to approximate with

Return type

ParticleFilter

Returns

a particle filter approximating distr with n particles

static from_particles(particles)[source]

Creates a particle filter from particles

Normalizes the weights in particles

Note:

Does not copy any of the particles, and particles may change in the future through belief updates

Parameters

particles (Iterable[Particle]) – the particles to create the filter from

Return type

ParticleFilter

Returns

a particle filter from given particles

probability_of(s, equality_function=<built-in function eq>)[source]

Returns the probability of s with equality equality_function

Parameters
  • s (State) – the state of which we will compute the probability

  • equality_function (Callable[[State, State], bool]) – returns whether two states are the same, defaults to eq

Return type

float

Returns

a number 0 … 1

class pomdp_belief_tracking.pf.particle_filter.Particle(state: State, weight: float)[source]

Bases: tuple

A (weighted) particle contains a state and weight

Create new instance of Particle(state, weight)

property state

The ‘particle’ or value

property weight

The (relative) probability of the particle

class pomdp_belief_tracking.pf.particle_filter.ParticleFilter(states)[source]

Bases: pomdp_belief_tracking.types.StateDistribution

A distribution from weighted particles

Creates a particle filter from provided states

Provides uniform weights to states

Parameters

states (Sequence[State]) – the particles

effective_sample_size()[source]

Returns the “effective sample size” of the particle filter

Calls effective_sample_size()

Return type

float

Returns

effective sample size of self

static from_distribution(distr, n)[source]

Constructs a particle filter of n particles from distr

Basically samples n particles.

NOTE:

Does not _copy_ samples drawn from ``distr``. Make sure `distr()`
copies the particles if necessary
Parameters
  • distr (StateDistribution) – the distribution to approximate

  • n (int) – the number of particles to approximate with

Return type

ParticleFilter

Returns

a particle filter approximating distr with n particles

static from_particles(particles)[source]

Creates a particle filter from particles

Normalizes the weights in particles

Note:

Does not copy any of the particles, and particles may change in the future through belief updates

Parameters

particles (Iterable[Particle]) – the particles to create the filter from

Return type

ParticleFilter

Returns

a particle filter from given particles

probability_of(s, equality_function=<built-in function eq>)[source]

Returns the probability of s with equality equality_function

Parameters
  • s (State) – the state of which we will compute the probability

  • equality_function (Callable[[State, State], bool]) – returns whether two states are the same, defaults to eq

Return type

float

Returns

a number 0 … 1

static total_weight(particles)[source]

Computes the total weight in particles

Parameters

particles (Iterable[Particle]) – particles to compute the total weight of

pomdp_belief_tracking.pf.particle_filter.apply(f, pf)[source]

Returns a new ParticleFilter with f applied on states in pf

Note: assumes f does _not_ affect the input State, otherwise the input pf will be affected to

Parameters
  • f (Callable[[State], State]) – the function to apply to particles

  • pf (ParticleFilter) – input starting particle filter / belief

Return type

ParticleFilter

Returns

the result of applying f onto pf

pomdp_belief_tracking.pf.particle_filter.effective_sample_size(weights, total_weight=None)[source]

Computes the “effective sample size” of the given weights

This value represents how “healthy” the underlying samples are. The lower this value, the fewer “real samples” are represented. As in, the closer this comes to zero, the more degenerated the samples are.

See https://en.wikipedia.org/wiki/Effective_sample_size

Parameters
  • weights (Iterable[float]) – the weights of the samples

  • total_weight (Optional[float]) – total weight of all samples, requires extra computation if not given

Return type

float

Returns

the effective sample size of weights

pomdp_belief_tracking.pf.rejection_sampling module

pomdp_belief_tracking.pf.importance_sampling module

Provides a general implementation of importance sampling:

pomdp_belief_tracking.pf.importance_sampling.general_importance_sample(proposal_distr, weight_func, particles)[source]

The particle filter implementation of IS

The underlying algorithm for importance sampling uses a ProposalDistribution and weighting distribution to update a particle filter:

for weight, sample in particles:
    sample ~ proposal_distr(sample)
    weight <- weight * weight_func(sample)

Returns how long the update took in info with key “belief_update_runtime”

Parameters
Return type

Tuple[ParticleFilter, Dict[str, Any]]

Returns

a new particle set

Our belief update version of importance sampling calls this function with the appropriate parameters:

pomdp_belief_tracking.pf.importance_sampling.importance_sample(transition_func, observation_model, n, initial_state_distribution, a, o)[source]

Applies general_importance_sample() on POMDPs

Here the transition_func is used to propose next states, which are weighted according to the weight_func given o. If initial_state_distribution is _not_ a particle filter (with a given size), then we sample n particles with weight 1 to start IS. Otherwise we use the particles in the PF.

n is necessary when initial_state_distribution is not a ParticleFilter. Otherwise ignored.

Parameters
Return type

Tuple[ParticleFilter, Dict[str, Any]]

Returns

updated belief

Which can best be created through our construction function (with sane defaults, otherwise you can also apply partial):

pomdp_belief_tracking.pf.importance_sampling.create_importance_sampling(transition_func, observation_model, n)[source]

Partial function that returns a regular IS belief update

A simple wrapper around importance_sample()

Here the transition_func is used to propose next states, which are weighted according to the weight_func. If the belief update is _not_ a particle filter (with a given size), then we sample n particles with weight 1 to start IS. Otherwise we use the particles in the PF.

n is necessary when initial_state_distribution is not a ParticleFilter. Otherwise ignored.

Parameters
Return type

BeliefUpdate

Returns

func:importance_sample as pomdp_belief_tracking.types.BeliefUpdate

Sequential importance sampling, the application of the belief update over multiple time steps, often involes resample() to avoid particle degeneration. When to resample is not straightforward; we provide a general condition protocol

class pomdp_belief_tracking.pf.importance_sampling.ResampleCondition(*args, **kwds)[source]

The signature of a resample condition

__call__(pf)[source]

Inspects pf and decides whether it is time to re-sample

Parameters

pf (ParticleFilter) – the particle filter to potentially resample

Return type

bool

Returns

True if pf should be resampled

Provided implementations:

ineffective_sample_size

Returns whether the sample size of pf is lower than minimal_size

Lastly, we provide a factory function that combines importance sampling with resampling

pomdp_belief_tracking.pf.importance_sampling.create_sequential_importance_sampling(resample_condition, transition_func, observation_model, n=None)[source]

Main entry point of this module to create importance sampling update

A simple wrapper combining resample() (if resample_condition is met) with importance_sample() (created by calling create_importance_sampling()

n is necessary when initial_state_distribution is not a ParticleFilter. Otherwise ignored.

Parameters
  • resample_condition (ResampleCondition) – when to resample (called before IS)

  • transition_func (TransitionFunction) – the transition function to propose particles

  • observation_model (Callable[[State, Action, State, Observation], float]) – the function to weight the new particles

  • n (Optional[int]) – number of desired particles

Return type

BeliefUpdate

class pomdp_belief_tracking.pf.importance_sampling.ResampleCondition(*args, **kwds)[source]

Bases: typing_extensions.Protocol

The signature of a resample condition

__call__(pf)[source]

Inspects pf and decides whether it is time to re-sample

Parameters

pf (ParticleFilter) – the particle filter to potentially resample

Return type

bool

Returns

True if pf should be resampled

Provided implementations:

ineffective_sample_size

Returns whether the sample size of pf is lower than minimal_size

class pomdp_belief_tracking.pf.importance_sampling.WeightFunction(*args, **kwds)[source]

Bases: typing_extensions.Protocol

Signature of a weighting function in general_importance_sample()

__call__(proposal, sample_ctx, info)[source]

Weights a state -> proposal transition under sample_ctx

Parameters
  • proposal (State) – proposed (updated) sample

  • sample_ctx (Any) – context around proposal

  • info (Dict[str, Any]) – global information stored during importance sampling

Return type

float

Returns

a 0 <= weight <= 1

pomdp_belief_tracking.pf.importance_sampling.create_importance_sampling(transition_func, observation_model, n)[source]

Partial function that returns a regular IS belief update

A simple wrapper around importance_sample()

Here the transition_func is used to propose next states, which are weighted according to the weight_func. If the belief update is _not_ a particle filter (with a given size), then we sample n particles with weight 1 to start IS. Otherwise we use the particles in the PF.

n is necessary when initial_state_distribution is not a ParticleFilter. Otherwise ignored.

Parameters
Return type

BeliefUpdate

Returns

func:importance_sample as pomdp_belief_tracking.types.BeliefUpdate

pomdp_belief_tracking.pf.importance_sampling.create_sequential_importance_sampling(resample_condition, transition_func, observation_model, n=None)[source]

Main entry point of this module to create importance sampling update

A simple wrapper combining resample() (if resample_condition is met) with importance_sample() (created by calling create_importance_sampling()

n is necessary when initial_state_distribution is not a ParticleFilter. Otherwise ignored.

Parameters
  • resample_condition (ResampleCondition) – when to resample (called before IS)

  • transition_func (TransitionFunction) – the transition function to propose particles

  • observation_model (Callable[[State, Action, State, Observation], float]) – the function to weight the new particles

  • n (Optional[int]) – number of desired particles

Return type

BeliefUpdate

pomdp_belief_tracking.pf.importance_sampling.general_importance_sample(proposal_distr, weight_func, particles)[source]

The particle filter implementation of IS

The underlying algorithm for importance sampling uses a ProposalDistribution and weighting distribution to update a particle filter:

for weight, sample in particles:
    sample ~ proposal_distr(sample)
    weight <- weight * weight_func(sample)

Returns how long the update took in info with key “belief_update_runtime”

Parameters
Return type

Tuple[ParticleFilter, Dict[str, Any]]

Returns

a new particle set

pomdp_belief_tracking.pf.importance_sampling.importance_sample(transition_func, observation_model, n, initial_state_distribution, a, o)[source]

Applies general_importance_sample() on POMDPs

Here the transition_func is used to propose next states, which are weighted according to the weight_func given o. If initial_state_distribution is _not_ a particle filter (with a given size), then we sample n particles with weight 1 to start IS. Otherwise we use the particles in the PF.

n is necessary when initial_state_distribution is not a ParticleFilter. Otherwise ignored.

Parameters
Return type

Tuple[ParticleFilter, Dict[str, Any]]

Returns

updated belief

pomdp_belief_tracking.pf.importance_sampling.ineffective_sample_size(minimal_size, pf)[source]

Returns whether the sample size of pf is lower than minimal_size

When given minimal_size this implements ResampleCondition protocol. Asserts that minimal_size > 0

Calls effective_sample_size() under the hood

Parameters
  • minimal_size (float) – the required sample size for this to return False (> 0)

  • pf (ParticleFilter) – the particle filter to test the sample size of

Returns

True if minimal_size > sample size of pf

pomdp_belief_tracking.pf.importance_sampling.resample(pf, n)[source]

Samples n particles from distr

Parameters
  • pf (ParticleFilter) – incoming particle filter

  • n (int) – number of desired samples in returned PF

Return type

ParticleFilter

Returns

the resulting particle filter of resampling pf

pomdp_belief_tracking.pf.types module

Some additional (shared) particle filtering types

class pomdp_belief_tracking.pf.types.ProposalDistribution(*args, **kwds)[source]

Bases: typing_extensions.Protocol

The signature for the proposal distribution for sampling

Module contents

Particle Filters and their update functions

Particle filters are approximations of distributions. They represent them through Particle, a State with a relative weight.

In this module we implement such a ParticleFilter and some functions to update them: