Hippocamplus My Second Memory


Some notes on my recent attempt to learn Python.


To read the standard input:

import fileinput

for line in fileinput.input():

To read a file line by line:

for line in open(input_file, 'rt')

Or in one line:

lines = [line.rstrip('\n') for line in open(input_file, 'rt')]

To write a file:

f = open(output_file, 'wt') # 'a' for append mode
f.write('something' + str(an_integer) + '\n')

For gzipped files, use gzip module:

import gzip
f = gzip.open('file.txt.gz', 'rt')
f = gzip.open('file.txt.gz', 'wt')

To quickly read a manually indexed (e.g. grep -b) file quickly, use seek to jump to a byte-offset and tell to save the current byte-offset.

f = open(reads_fn)
line = f.next()

To save python objects one can use pickle module:

pickle.dump(obj1, open("obj1.pkl","wb"))
obj1 = pickle.load(open("obj1.pkl", "rb"))

Note the b that specifies binary mode which is slightly more efficient for non-text files.

To prompt the user for input

x = input('What is?')

Passing arguments to a script

The quick way is to use sys.argv like that:

import sys
in1 =  sys.argv[1]

The more fancy way is to use argparse. I usually use it like this (see the doc for a more complete example):

import argparse

parser = argparse.ArgumentParser(description='Do something cool.')
parser.add_argument('-in', dest='input', help='the input file', required=True)
parser.add_argument('-k', dest='k', default=3, type=int, help='an integer')
parser.add_argument('-bool', dest='bool', action='store_true', help='False by default')
parser.add_argument('-out', dest='output', help='the output file')

args = parser.parse_args()
print args.input
print args.k
print args.output

To use argparse but still read the standard input with fileinput:

import argparse
import fileinput

# argparse definition

for line in fileinput.input(files='-'):
    # something with 'line' and the arguments from argparse



arr = [3]
for elt in arr:
    print elt
  • list(myarray) to make a copy of the array without going through the copy module.

numpy arrays are more efficient when the data needs to be manipulated/combined. For example it allows for some vectorized operations:

pred_counts = numpy.zeros((4, len(rfc.classes_)))
pred_counts[0, ] += sum(predprobs > .5)

To convert a string into an array: numpy.fromstring(line, dtype=int, sep='\t'). The opposite would be '\t'.join(map(str, arr))


Hash tables or dictionaries holds unordered sets of key/value pairs.

a_hash = {'bob': 42, 'kevin': 7}


It’s much faster to check the presence of an element in a set than a list. For example:

all_words = set()
all_words.add('word') # etc
'word' in all_words



'tempString' + another_string + str(an_integer)
  • rstrip removes the trailing characters. By defaut, white spaces. s.rstrip('\t\n') removes any combination of tabs and new lines.
  • str.replace(a_string, ":", "_") to replace characters.
  • ' '.join(a_string_array) to merge an array into one string.

For regular expression:

import re
pattern = re.compile('something:(.+)')
mres = pattern.search(line)

To decode/encode a string, e.g. when reading/writing files and not using the text mode t:



def functionname( parameters ):
   return [expression]

Data frame (pandas)

df = pd.DataFrame([{'c1': 1, c2': 'val2'},
                   {'c1': 0, c2': 'val3'}])

df = DataFrame.from_csv('file.tsv, sep='\t', header=0)

df = df.sort_values(['c1'], ascending=False)

df.to_csv('file.tsv', sep='\t', index=False)

for row in df.itertuples():
    # something with row.c1


import threading

def task(x, results, index):
    results[index] = SOMETHING WITH X

threads = [None] * 10
results = [None] * 10

## assuming the inputs are in an array called inputs
for ii in range(len(threads)):
    threads[ii] = threading.Thread(target=task, args=(inputs[ii], results, ii))
for ii in range(len(threads)):

Naming and formatting

  • Use lowercase _-separated for modules and functions name. E.g. my_function.
  • Use CapsWord for classes. E.g. MyClass.
  • Use _ prefix for private variable. E.g. _secret_variable.

Module, files, imports

There should be a __init__.py file in each directory containing modules to import. It can be empty. If not the code inside is run on import.

To import the classes/functions defined in file module1.py, simply do

import module1

Packaging code

Minimal structure:

├── __init__.py
├── mypackage
│   ├── __init__.py
│   ├── submod1.py
│   ├── submod2.py
├── setup.py

__init__.py files can be (and usually are) empty. The setup.py contains metadata about the package and list dependencies and files to includes. Once setup, the package can be installed locally with pip install .. Other functions exist to produce a tarball or upload the code to pypi.

More details at “How to package your python code”.

Install a module locally

For example when running a script on a HPC cluster, it’s often easier to install modules in your home.

If the package can be installed with pip, I used pip install --user packageName.

Otherwise using the setup.py method, I first initialize the local library by creating a directory and updating PYTHONPATH.

mkdir -p /home/monlongj/pylib/lib/python2.7/site-packages/

Then to install a module (e.g. pyfaidx) I did:

wget https://github.com/mdshw5/pyfaidx/archive/v0.4.7.1.tar.gz
tar -xzvf v0.4.7.1.tar.gz
cd pyfaidx-
python setup.py install --prefix=/home/monlongj/pylib

To use this, I must always have the local library in PYTHONPATH.

Modules (as in module load ...) might be a cleaner solution. I use existing module but I didn’t take the time to see how I could create and use them more.


An histogram with matplotlib:

import matplotlib.pyplot as plt
plt.xlabel('x label')
plt.ylabel('y label')


plt.plot(xy[1], xy[0])

Shell integration

List files with glob, remove with os, remove non-empty directories with shutil.

import glob
import os
import shutil

filelist = glob.glob('temp*')
for f in filelist:

shutil.copy(my_file, a_copy_of_my_file)
shutil.copytree(my_dir, a_copy_of_my_dir)

	file = open(filen)
new_path = os.path.join(my_dir, my_basename)

# create directory if it doesn't exist
if not os.path.exists(my_dir):

Run commands with subprocess

Use run with check=True (to stop on errors).

import subprocess

bwa_cmd = ['bwa', 'mem', bwaidx_file, fastq_file]
bwa_out = subprocess.run(bwa_cmd, check=True)

To avoid large error in the stderr stream, either catch or redirect it. Same for stdout.

out_file = open('output.txt', 'w')
bwa_out = subprocess.run(cmd, check=True, stdout=out_file, stderr=subprocess.DEVNULL)

To catch errors as a CalledProcessError exception:

    subprocess.run(cmd, check=True)
except subprocess.CalledProcessError as e:
    # do something else with e.returncode for example

To capture the output (Python >= 3.7):

run_o = subprocess.run(cmd, check=True, capture_output=True)
## something with run_o.stdout or run_o.stderr

Resource monitoring

For logging, add date to the output with:

import datetime

Timing a function with timeit:

from timeit import Timer
def test():
	global myobj

A built-in way to monitor resident memory usage:

import tracemalloc
# do something
mem_size, peak_mem_size = tracemalloc.get_traced_memory()
peak_mem_gb = round(peak_mem_size/(1024*1024*1024), 5)


BioPython main documentation is available here. What I ended up using are the following.


from Bio.Seq import MutableSeq
from Bio.Alphabet import generic_dna

mseq = MutableSeq('ATGCTAGCT', generic_dna)

To simulate sequences, I ended up using numpy arrays (I think they could take a list as indexes).

import numpy
import random
from Bio.Seq import MutableSeq
from Bio.Alphabet import generic_dna

_nuc = numpy.array(["A", "T", "C", "G"])
def randSeq(length):
    seqArray = _nuc[[int(random.random()*4) for i in xrange(length)]]
    return(MutableSeq("".join(seqArray), generic_dna))


To read a fasta file:

from Bio import SeqIO

for record in SeqIO.parse(fasta_file, "fasta"):
    print record.id + '\t' + str(len(record.seq))

To write a fasta:

from Bio.SeqRecord import SeqRecord

recs = []
for ii in xrange(1000):
    seq = randSeq(100))
    recs.append(SeqRecord(seq, id=str(ii)))
SeqIO.write(recs, "reads.fa", "fasta")

To read/write a gzipped fasta file:

inf = gzip.open('input.fa.gz', 'rt')
recs = []
for rec in SeqIO.parse(inf, 'fasta'):
SeqIO.write(recs, gzip.open('reads.fa.gz', 'wt'), "fasta")

Indexed fasta

I use pyfaidx (see repo) to quickly access slices of an indexed fasta. The indexing can be performed by samtools faidx or functions from the package.

from pyfaidx import Fasta

fa = Fasta(args.ref)
fa = fa[str(ch)][start:end]
print fa.id + '\t' + fa.seq

SAM files

Here is a short example on how to get reads from a region.

import pysam

bamfile = pysam.AlignmentFile(bam_fn, "rb")

reads_reg = bamfile.fetch(reference=ch_reg, start=start_reg, end=end_reg)
reads_seq = {}
for read in reads_reg:
    reads_seq[read.query_name] = read.query_alignment_sequence

To iterate over all the reads (faster) use:

for aln in bamfile.fetch(until_eof=True):

If the BAM is indexed, the number of mapped and unmapped reads is stored in bamfile.mapped and bamfile.unmapped.



I now use this most of the time.

Example of reading and annotating a VCF with cyvcf2:

vcf = VCF(in_vcf_path)
vcf.add_info_to_header({'ID': 'NEW_INFO',
                        'Description': 'Description of info',
                        'Type': 'Float', 'Number': '1'})
vcf_o = Writer(out_vcf_path, vcf)
for variant in vcf:
    # something with :
    # variant.CHROM, variant.start, variant.end, variant.REF, variant.ALT[0]
    # variant.format('AD')[0]
    svtype = variant.INFO.get('SVTYPE')
    if svtype is not None and svtype == 'BND':
    variant.INFO["NEW_INFO"] = VAL


I used to use the PyVCF module.

import vcf
vcf_reader = vcf.Reader(open('var.vcf', 'r'))
for record in vcf_reader:
	if(record.INFO['AF'][0] < 0.1):
		print record.ID

The fields are accessed through:

  • record.CHROM
  • record.POS
  • record.ID
  • record.REF
  • record.ALT
  • record.QUAL
  • record.FILTER
  • record.INFO


  • Sort elements with sorted(a_list, key=lambda k: -something[k]).
  • Count unique elements in an array: unique, counts = numpy.unique(array1, return_counts=True).
  • Sub-sample with random.sample(a_list, 10).
  • In a loop, jump to the next iteration with continue, or leave the loop with break.
  • Find indexes in array: numpy.where(x > 0).
  • quit() to stop a program.
  • myVar is None to test if a variable is None.
  • Print the working directory with os.getcwd() (after importing os).
  • To get a ~19 digit-long hash of a string: hash(my_very_long_id).

When filling a nested dictionary, it’s painful to always test if the key exists before updating it’s value. One trick is to use try/except. It’s not that much quicker but it looks fancier so you forget about the pain:

except KeyError:
    dict[lev1] = [i]

To time a function, one simple way is to use time.time() before and after the function and report the difference. There might be issues with Windows but I use timing for internal benchmarks, never in final code.

To convert a binary number into a decimal number: int('11011011', 2).

To get the index as well as the value in a list, use enumerate:

for id, val in enumerate(myList):
	print(id, val)

Jupyter notebooks

  • %%capture at the beginning of a cell means the stdout/stderr will be captured (i.e. not shown).
  • !pip install XXX to install packages.

The main keyboard shortcuts I use:

  • Ctrl+Enter run cell.
  • Shift+Enter run cell and select below.
  • b insert cell below.
  • Switch cell mode to:
    • y code.
    • m markdown.
    • 1 to heading 1
  • d d delete cell. Undo cell deletion with z.
  • i i interrupt kernel.
  • 0 0 restart kernel.
  • h Show keyboard help.
  • In edit mode
    • Ctrl+] ([) indent (deindent)
    • Ctrl+Shift+- split cell