python yield iterate

To understand what yield does, you must understand what generators are. And before generators come iterables.

Iterables

When you create a list, you can read its items one by one. Reading its items one by one is called iteration:

>>> mylist = [1, 2, 3]
>>> for i in mylist:
...    print(i)
1
2
3

mylist is an iterable. When you use a list comprehension, you create a list, and so an iterable:

>>> mylist = [x*x for x in range(3)]
>>> for i in mylist:
...    print(i)
0
1
4

Everything you can use “for... in..." on is an iterable: lists, strings, files…

These iterables are handy because you can read them as much as you wish, but you store all the values in memory and this is not always what you want when you have a lot of values.

Generators

Generators are iterators, but you can only iterate over them once. It’s because they do not store all the values in memory, they generate the values on the fly:

>>> mygenerator = (x*x for x in range(3))
>>> for i in mygenerator:
...    print(i)
0
1
4

It is just the same except you used () instead of []. BUT, you cannot perform for i in mygenerator a second time since generators can only be used once: they calculate 0, then forget about it and calculate 1, and end calculating 4, one by one.

Yield

Yield is a keyword that is used like return, except the function will return a generator.

>>> def createGenerator():
...    mylist = range(3)
...    for i in mylist:
...        yield i*i
...
>>> mygenerator = createGenerator() # create a generator
>>> print(mygenerator) # mygenerator is an object!
<generator object createGenerator at 0xb7555c34>
>>> for i in mygenerator:
...     print(i)
0
1
4

Here it’s a useless example, but it’s handy when you know your function will return a huge set of values that you will only need to read once.

To master yield, you must understand that when you call the function, the code you have written in the function body does not run. The function only returns the generator object, this is a bit tricky 🙂

Then, your code will be run each time the for uses the generator.

Now the hard part:

The first time the for calls the generator object created from your function, it will run the code in your function from the beginning until it hits yield, then it’ll return the first value of the loop. Then, each other call will run the loop you have written in the function one more time, and return the next value, until there is no value to return.

The generator is considered empty once the function runs but does not hit yield anymore. It can be because the loop had come to an end, or because you do not satisfy a "if/else" anymore.

Your code explained

Generator:

# Here you create the method of the node object that will return the generator
def node._get_child_candidates(self, distance, min_dist, max_dist):

  # Here is the code that will be called each time you use the generator object:

  # If there is still a child of the node object on its left
  # AND if distance is ok, return the next child
  if self._leftchild and distance - max_dist < self._median:
      yield self._leftchild

  # If there is still a child of the node object on its right
  # AND if distance is ok, return the next child
  if self._rightchild and distance + max_dist >= self._median:
      yield self._rightchild

  # If the function arrives here, the generator will be considered empty
  # there is no more than two values: the left and the right children

Caller:

# Create an empty list and a list with the current object reference
result, candidates = list(), [self]

# Loop on candidates (they contain only one element at the beginning)
while candidates:

    # Get the last candidate and remove it from the list
    node = candidates.pop()

    # Get the distance between obj and the candidate
    distance = node._get_dist(obj)

    # If distance is ok, then you can fill the result
    if distance <= max_dist and distance >= min_dist:
        result.extend(node._values)

    # Add the children of the candidate in the candidates list
    # so the loop will keep running until it will have looked
    # at all the children of the children of the children, etc. of the candidate
    candidates.extend(node._get_child_candidates(distance, min_dist, max_dist))

return result

This code contains several smart parts:

  • The loop iterates on a list but the list expands while the loop is being iterated 🙂 It’s a concise way to go through all these nested data even if it’s a bit dangerous since you can end up with an infinite loop. In this case, candidates.extend(node._get_child_candidates(distance, min_dist, max_dist)) exhausts all the values of the generator, but while keeps creating new generator objects which will produce different values from the previous ones since it’s not applied on the same node.
  • The extend() method is a list object method that expects an iterable and adds its values to the list.

Usually we pass a list to it:

>>> a = [1, 2]
>>> b = [3, 4]
>>> a.extend(b)
>>> print(a)
[1, 2, 3, 4]

But in your code it gets a generator, which is good because:

  1. You don’t need to read the values twice.
  2. You can have a lot of children and you don’t want them all stored in memory.

And it works because Python does not care if the argument of a method is a list or not. Python expects iterables so it will work with strings, lists, tuples and generators! This is called duck typing and is one of the reason why Python is so cool. But this is another story, for another question…

You can stop here, or read a little bit to see a advanced use of generator:

Controlling a generator exhaustion

>>> class Bank(): # let's create a bank, building ATMs
...    crisis = False
...    def create_atm(self):
...        while not self.crisis:
...            yield "$100"
>>> hsbc = Bank() # when everything's ok the ATM gives you as much as you want
>>> corner_street_atm = hsbc.create_atm()
>>> print(corner_street_atm.next())
$100
>>> print(corner_street_atm.next())
$100
>>> print([corner_street_atm.next() for cash in range(5)])
['$100', '$100', '$100', '$100', '$100']
>>> hsbc.crisis = True # crisis is coming, no more money!
>>> print(corner_street_atm.next())
<type 'exceptions.StopIteration'>
>>> wall_street_atm = hsbc.create_atm() # it's even true for new ATMs
>>> print(wall_street_atm.next())
<type 'exceptions.StopIteration'>
>>> hsbc.crisis = False # trouble is, even post-crisis the ATM remains empty
>>> print(corner_street_atm.next())
<type 'exceptions.StopIteration'>
>>> brand_new_atm = hsbc.create_atm() # build a new one to get back in business
>>> for cash in brand_new_atm:
...    print cash
$100
$100
$100
$100
$100
$100
$100
$100
$100
...

It can be useful for various things like controlling access to a resource.

Itertools, your best friend

The itertools module contains special functions to manipulate iterables. Ever wish to duplicate a generator? Chain two generators? Group values in a nested list with a one liner? Map / Zipwithout creating another list?

Then just import itertools.

An example? Let’s see the possible orders of arrival for a 4 horse race:

>>> horses = [1, 2, 3, 4]
>>> races = itertools.permutations(horses)
>>> print(races)
<itertools.permutations object at 0xb754f1dc>
>>> print(list(itertools.permutations(horses)))
[(1, 2, 3, 4),
 (1, 2, 4, 3),
 (1, 3, 2, 4),
 (1, 3, 4, 2),
 (1, 4, 2, 3),
 (1, 4, 3, 2),
 (2, 1, 3, 4),
 (2, 1, 4, 3),
 (2, 3, 1, 4),
 (2, 3, 4, 1),
 (2, 4, 1, 3),
 (2, 4, 3, 1),
 (3, 1, 2, 4),
 (3, 1, 4, 2),
 (3, 2, 1, 4),
 (3, 2, 4, 1),
 (3, 4, 1, 2),
 (3, 4, 2, 1),
 (4, 1, 2, 3),
 (4, 1, 3, 2),
 (4, 2, 1, 3),
 (4, 2, 3, 1),
 (4, 3, 1, 2),
 (4, 3, 2, 1)]

Understanding the inner mechanisms of iteration

Iteration is a process implying iterables (implementing the __iter__() method) and iterators (implementing the __next__() method). Iterables are any objects you can get an iterator from. Iterators are objects that let you iterate on iterables.

More about it in this article about how does the for loop work.

python parallel programming

An introduction to parallel programming

using Python’s multiprocessing module

— written by Sebastian Raschka June 20, 2014

http://platform.twitter.com/widgets/tweet_button.c633b87376883931e7436b93bb46a699.en.html#_=1451203349839&dnt=false&id=twitter-widget-0&lang=en&original_referer=http%3A%2F%2Fsebastianraschka.com%2FArticles%2F2014_multiprocessing_intro.html&size=m&text=An%20introduction%20to%20parallel%20programming%20using%20Python%27s%20multiprocessing%20module&type=share&url=http%3A%2F%2Fsebastianraschka.com%2FArticles%2F2014_multiprocessing_intro.html&via=rasbt

CPUs with multiple cores have become the standard in the recent development of modern computer architectures and we can not only find them in supercomputer facilities but also in our desktop machines at home, and our laptops; even Apple’s iPhone 5S got a 1.3 Ghz Dual-core processor in 2013.

However, the default Python interpreter was designed with simplicity in mind and has a thread-safe mechanism, the so-called “GIL" (Global Interpreter Lock). In order to prevent conflicts between threads, it executes only one statement at a time (so-called serial processing, or single-threading).

In this introduction to Python’s multiprocessing module, we will see how we can spawn multiple subprocesses to avoid some of the GIL’s disadvantages.

Table of Contents

This table of contents was created by markdown-toclify

Multi-Threading vs. Multi-Processing

Depending on the application, two common approaches in parallel programming are either to run code via threads or multiple processes, respectively. If we submit “jobs" to different threads, those jobs can be pictured as “sub-tasks" of a single process and those threads will usually have access to the same memory areas (i.e., shared memory). This approach can easily lead to conflicts in case of improper synchronization, for example, if processes are writing to the same memory location at the same time.

A safer approach (although it comes with an additional overhead due to the communication overhead between separate processes) is to submit multiple processes to completely separate memory locations (i.e., distributed memory): Every process will run completely independent from each other.

Here, we will take a look at Python’s multiprocessing module and how we can use it to submit multiple processes that can run independently from each other in order to make best use of our CPU cores.

Introduction to the multiprocessing module

The multiprocessing module in Python’s Standard Library has a lot of powerful features. If you want to read about all the nitty-gritty tips, tricks, and details, I would recommend to use the official documentationas an entry point.

In the following sections, I want to provide a brief overview of different approaches to show how themultiprocessing module can be used for parallel programming.

The Process class

The most basic approach is probably to use the Process class from the multiprocessing module.
Here, we will use a simple random string generator to generate 4 random string in parallel processes.

import multiprocessing as mp
import random
import string

# Define an output queue
output = mp.Queue()

# define a example function
def rand_string(length, output):
    """ Generates a random string of numbers, lower- and uppercase chars. """
    rand_str = ''.join(random.choice(
                    string.ascii_lowercase
                    + string.ascii_uppercase
                    + string.digits)
               for i in range(length))
    output.put(rand_str)

# Setup a list of processes that we want to run
processes = [mp.Process(target=rand_string, args=(5, output)) for x in range(4)]

# Run processes
for p in processes:
    p.start()

# Exit the completed processes
for p in processes:
    p.join()

# Get process results from the output queue
results = [output.get() for p in processes]

print(results)
['yzQfA', 'PQpqM', 'SHZYV', 'PSNkD']

How to retrieve results in a particular order

The order of the obtained results does not necessarily have to match the order of the processes (in theprocesses list). Since we eventually use the .get() method to retrieve the results from the Queuesequentially, the order in which the processes finished determines the order of our results.
E.g., if the second process has finished just before the first process, the order of the strings in theresults list could have also been ['PQpqM', 'yzQfA', 'SHZYV', 'PSNkD'] instead of['yzQfA', 'PQpqM', 'SHZYV', 'PSNkD']

If our application required us to retrieve results in a particular order, one possibility would be to refer to the processes’ ._identity attribute. In this case, we could also simply use the values from ourrange object as position argument. The modified code would be:

# define a example function
def rand_string(length, pos, output):
    """ Generates a random string of numbers, lower- and uppercase chars. """
    rand_str = ''.join(random.choice(
                    string.ascii_lowercase
                    + string.ascii_uppercase
                    + string.digits)
                for i in range(length))
    output.put((pos, rand_str))

# Setup a list of processes that we want to run
processes = [mp.Process(target=rand_string, args=(5, x, output)) for x in range(4)]

And the retrieved results would be tuples, for example,[(0, 'KAQo6'), (1, '5lUya'), (2, 'nj6Q0'), (3, 'QQvLr')]
or [(1, '5lUya'), (3, 'QQvLr'), (0, 'KAQo6'), (2, 'nj6Q0')]

To make sure that we retrieved the results in order, we could simply sort the results and optionally get rid of the position argument:

results.sort()
results = [r[1] for r in results]
['KAQo6', '5lUya', 'nj6Q0', 'QQvLr']

A simpler way to to maintain an ordered list of results is to use the Pool.apply and Pool.mapfunctions which we will discuss in the next section.

The Pool class

Another and more convenient approach for simple parallel-processing tasks is provided by the Poolclass.

There are four methods that are particularly interesting:

  • Pool.apply
  • Pool.map
  • Pool.apply_async
  • Pool.map_async

The Pool.apply and Pool.map methods are basically equivalents to Python’s in-built apply andmap functions.

Before we come to the async variants of the Pool methods, let us take a look at a simple example using Pool.apply and Pool.map. Here, we will set the number of processes to 4, which means that the Pool class will only allow 4 processes running at the same time.
This time, we will use a simple cube-function that returns the cube of an input number.

def cube(x):
    return x**3
pool = mp.Pool(processes=4)
results = [pool.apply(cube, args=(x,)) for x in range(1,7)]
print(results)
[1, 8, 27, 64, 125, 216]
pool = mp.Pool(processes=4)
results = pool.map(cube, range(1,7))
print(results)
[1, 8, 27, 64, 125, 216]

The Pool.map and Pool.apply will lock the main program until a process has finished, which is quite useful if we want to obtain results in a particular order for certain applications.
In contrast, the async variants will submit all processes at once and retrieve the results as soon as they are finished. For example, the resulting sequence of cubes could also have been [8, 1, 64, 27, 125, 216] if the cube processes finished in a different order.

One more difference is that we need to use the get method after the apply_async() call in order to obtain the return values of the finished processes.

pool = mp.Pool(processes=4)
results = [pool.apply_async(cube, args=(x,)) for x in range(1,7)]
output = [p.get() for p in results]
print(output)
[1, 8, 27, 64, 125, 216]

Kernel density estimation as benchmarking function

In the following section, I want to do a simple comparison of a serial vs. multiprocessing approach where I will use a slightly more complex function than the cube example that we have been using above.

Here, I define a function for performing a Kernel density estimation for probability density functions using the Parzen-window technique.
I don’t want to go into much detail about the theory of this technique, since we are mostly interested to see how multiprocessing can be used for performance improvements, but you are welcome to read my more detailed article about the Parzen-window method here.

import numpy as np

def parzen_estimation(x_samples, point_x, h):
    """
    Implementation of a hypercube kernel for Parzen-window estimation.

    Keyword arguments:
        x_sample:training sample, 'd x 1'-dimensional numpy array
        x: point x for density estimation, 'd x 1'-dimensional numpy array
        h: window width

    Returns the predicted pdf as float.

    """
    k_n = 0
    for row in x_samples:
        x_i = (point_x - row[:,np.newaxis]) / (h)
        for row in x_i:
            if np.abs(row) > (1/2):
                break
        else: # "completion-else"*
            k_n += 1
    return (k_n / len(x_samples)) / (h**point_x.shape[1])

A quick note about the “completion else""In the past, I received some comments about whether I used this for-else combination intentionally or if it happened by mistake. This is a legitimate question, since this “completion-else" is rarely used (that’s what I call it, I am not aware of a more “official" for the else in this context; if there is one, please let me know).
I have a more detailed explanation here in one of my blog-posts, but in a nutshell: In contrast to a conditional else (in combination with if-statements), the “completion else" is only executed if the preceding code block (here the for-loop) has finished.


The Parzen-window method in a nutshell

So, what does this Parzen-window function do? The answer in a nutshell: It counts points in a defined region (the so-called window), and divides the number of those points inside by the number of total points to estimate the probability of a single point being in a certain region.

Below is a simple example where our Parzen-window is represented by a hypercube centered at the origin, and we want to get an estimate of the probability for a point being in the center of the plot based on this hypercube.

from mpl_toolkits.mplot3d import Axes3D
import matplotlib.pyplot as plt
import numpy as np
from itertools import product, combinations
fig = plt.figure(figsize=(7,7))
ax = fig.gca(projection='3d')
ax.set_aspect("equal")

# Plot Points

# samples within the cube
X_inside = np.array([[0,0,0],[0.2,0.2,0.2],[0.1, -0.1, -0.3]])

X_outside = np.array([[-1.2,0.3,-0.3],[0.8,-0.82,-0.9],[1, 0.6, -0.7],
                  [0.8,0.7,0.2],[0.7,-0.8,-0.45],[-0.3, 0.6, 0.9],
                  [0.7,-0.6,-0.8]])

for row in X_inside:
    ax.scatter(row[0], row[1], row[2], color="r", s=50, marker='^')

for row in X_outside:
    ax.scatter(row[0], row[1], row[2], color="k", s=50)

# Plot Cube
h = [-0.5, 0.5]
for s, e in combinations(np.array(list(product(h,h,h))), 2):
    if np.sum(np.abs(s-e)) == h[1]-h[0]:
        ax.plot3D(*zip(s,e), color="g")

ax.set_xlim(-1.5, 1.5)
ax.set_ylim(-1.5, 1.5)
ax.set_zlim(-1.5, 1.5)

plt.show()

point_x = np.array([[0],[0],[0]])
X_all = np.vstack((X_inside,X_outside))

print('p(x) =', parzen_estimation(X_all, point_x, h=1))
p(x) = 0.3

Sample data and timeit benchmarks

In the section below, we will create a random dataset from a bivariate Gaussian distribution with a mean vector centered at the origin and an identity matrix as covariance matrix.

import numpy as np

np.random.seed(123)

# Generate random 2D-patterns
mu_vec = np.array([0,0])
cov_mat = np.array([[1,0],[0,1]])
x_2Dgauss = np.random.multivariate_normal(mu_vec, cov_mat, 10000)

The expected probability of a point at the center of the distribution is ~ 0.15915 as we can see below.
And our goal here is to use the Parzen-window approach to predict this density based on the sample data set that we have created above.

In order to make a “good" prediction via the Parzen-window technique, it is – among other things – crucial to select an appropriate window width. Here, we will use multiple processes to predict the density at the center of the bivariate Gaussian distribution using different window widths.

from scipy.stats import multivariate_normal
var = multivariate_normal(mean=[0,0], cov=[[1,0],[0,1]])
print('actual probability density:', var.pdf([0,0]))
actual probability density: 0.159154943092

Benchmarking functions

Below, we will set up benchmarking functions for our serial and multiprocessing approach that we can pass to our timeit benchmark function.
We will be using the Pool.apply_async function to take advantage of firing up processes simultaneously: Here, we don’t care about the order in which the results for the different window widths are computed, we just need to associate each result with the input window width.
Thus we add a little tweak to our Parzen-density-estimation function by returning a tuple of 2 values: window width and the estimated density, which will allow us to to sort our list of results later.

def parzen_estimation(x_samples, point_x, h):
    k_n = 0
    for row in x_samples:
        x_i = (point_x - row[:,np.newaxis]) / (h)
        for row in x_i:
            if np.abs(row) > (1/2):
                break
        else: # "completion-else"*
            k_n += 1
    return (h, (k_n / len(x_samples)) / (h**point_x.shape[1]))
def serial(samples, x, widths):
    return [parzen_estimation(samples, x, w) for w in widths]

def multiprocess(processes, samples, x, widths):
    pool = mp.Pool(processes=processes)
    results = [pool.apply_async(parzen_estimation, args=(samples, x, w)) for w in widths]
    results = [p.get() for p in results]
    results.sort() # to sort the results by input window width
    return results

Just to get an idea what the results would look like (i.e., the predicted densities for different window widths):

widths = np.arange(0.1, 1.3, 0.1)
point_x = np.array([[0],[0]])
results = []

results = multiprocess(4, x_2Dgauss, point_x, widths)

for r in results:
    print('h = %s, p(x) = %s' %(r[0], r[1]))
h = 0.1, p(x) = 0.016
h = 0.2, p(x) = 0.0305
h = 0.3, p(x) = 0.045
h = 0.4, p(x) = 0.06175
h = 0.5, p(x) = 0.078
h = 0.6, p(x) = 0.0911666666667
h = 0.7, p(x) = 0.106
h = 0.8, p(x) = 0.117375
h = 0.9, p(x) = 0.132666666667
h = 1.0, p(x) = 0.1445
h = 1.1, p(x) = 0.157090909091
h = 1.2, p(x) = 0.1685

Based on the results, we can say that the best window-width would be h=1.1, since the estimated result is close to the actual result ~0.15915.
Thus, for the benchmark, let us create 100 evenly spaced window width in the range of 1.0 to 1.2.

widths = np.linspace(1.0, 1.2 , 100)
import timeit

mu_vec = np.array([0,0])
cov_mat = np.array([[1,0],[0,1]])
n = 10000

x_2Dgauss = np.random.multivariate_normal(mu_vec, cov_mat, n)

benchmarks = []

benchmarks.append(timeit.Timer('serial(x_2Dgauss, point_x, widths)',
        'from __main__ import serial, x_2Dgauss, point_x, widths').timeit(number=1))

benchmarks.append(timeit.Timer('multiprocess(2, x_2Dgauss, point_x, widths)',
        'from __main__ import multiprocess, x_2Dgauss, point_x, widths').timeit(number=1))

benchmarks.append(timeit.Timer('multiprocess(3, x_2Dgauss, point_x, widths)',
        'from __main__ import multiprocess, x_2Dgauss, point_x, widths').timeit(number=1))

benchmarks.append(timeit.Timer('multiprocess(4, x_2Dgauss, point_x, widths)',
        'from __main__ import multiprocess, x_2Dgauss, point_x, widths').timeit(number=1))

benchmarks.append(timeit.Timer('multiprocess(6, x_2Dgauss, point_x, widths)',
        'from __main__ import multiprocess, x_2Dgauss, point_x, widths').timeit(number=1))

Preparing the plotting of the results

import platform

def print_sysinfo():

    print('\nPython version  :', platform.python_version())
    print('compiler        :', platform.python_compiler())

    print('\nsystem     :', platform.system())
    print('release    :', platform.release())
    print('machine    :', platform.machine())
    print('processor  :', platform.processor())
    print('CPU count  :', mp.cpu_count())
    print('interpreter:', platform.architecture()[0])
    print('\n\n')
from matplotlib import pyplot as plt
import numpy as np

def plot_results():
    bar_labels = ['serial', '2', '3', '4', '6']

    fig = plt.figure(figsize=(10,8))

    # plot bars
    y_pos = np.arange(len(benchmarks))
    plt.yticks(y_pos, bar_labels, fontsize=16)
    bars = plt.barh(y_pos, benchmarks,
         align='center', alpha=0.4, color='g')

    # annotation and labels

    for ba,be in zip(bars, benchmarks):
        plt.text(ba.get_width() + 1.4, ba.get_y() + ba.get_height()/2,
            '{0:.2%}'.format(benchmarks[0]/be),
            ha='center', va='bottom', fontsize=11)

    plt.xlabel('time in seconds for n=%s' %n, fontsize=14)
    plt.ylabel('number of processes', fontsize=14)
    t = plt.title('Serial vs. Multiprocessing via Parzen-window estimation', fontsize=18)
    plt.ylim([-1,len(benchmarks)+0.5])
    plt.xlim([0,max(benchmarks)*1.1])
    plt.vlines(benchmarks[0], -1, len(benchmarks)+0.5, linestyles='dashed')
    plt.grid()

    plt.show()

Results

plot_results()
print_sysinfo()

Python version  : 3.4.1
compiler        : GCC 4.2.1 (Apple Inc. build 5577)

system     : Darwin
release    : 13.2.0
machine    : x86_64
processor  : i386
CPU count  : 4
interpreter: 64bit

Conclusion

We can see that we could speed up the density estimations for our Parzen-window function if we submitted them in parallel. However, on my particular machine, the submission of 6 parallel 6 processes doesn’t lead to a further performance improvement, which makes sense for a 4-core CPU.
We also notice that there was a significant performance increase when we were using 3 instead of only 2 processes in parallel. However, the performance increase was less significant when we moved up to 4 parallel processes, respectively.
This can be attributed to the fact that in this case, the CPU consists of only 4 cores, and system processes, such as the operating system, are also running in the background. Thus, the fourth core simply does not have enough capacity left to further increase the performance of the fourth process to a large extend. And we also have to keep in mind that every additional process comes with an additional overhead for inter-process communication.

Also, an improvement due to parallel processing only makes sense if our tasks are “CPU-bound" where the majority of the task is spent in the CPU in contrast to I/O bound tasks, i.e., tasks that are processing data from a disk.

https://accounts.google.com/o/oauth2/postmessageRelay?parent=http%3A%2F%2Fsebastianraschka.com#rpctoken=914188318&forcesecure=1