// each cell has a gid and each piece has a special spgid which will be
// equal to gid on the piece containing the output port. The idea is that
// from a spgid, one can derive the gid.
// Also the set of sid for each cell is encoded to make one cells set distinct
// from any other cell.
if (name_declared("localloadfile")) {
begintemplate CellBalancePiece
public sid, idvec, cx, host, merge
objref idvec
proc init() {local i, n
sid = $2
host = -1
cx = $o1.scanvar
n = $o1.scanvar
idvec = new Vector()
for i=0, n-1 {
// all subtrees of the idvec connected to 0 of the first idvec
// $o1 is the cell SectionRef list
proc merge() {local i localobj sr
if(0) for i=0,idvec.size-1 {
sr = $o1.object(idvec.x[i])
sr.parent printf("original %s connect ", secname())
sr.sec printf("%s(%g), (%g) : %d\n", secname(), section_orientation(), parent_connection(), idvec.x[i])
for i=1, idvec.size-1 {
sr = $o1.object(idvec.x[i])
//sr.sec {printf("disconnect %d %s\n", i, secname())}
//$o1.object(idvec.x[0]).sec { printf("reconnect to %s(0)\n", secname())}
sr.sec { disconnect() }
$o1.object(idvec.x[0]).sec { connect sr.sec(0), 0 }
endtemplate CellBalancePiece
begintemplate CellBalanceInfo
public gid, cx, subtrees, pcx, host, distinct_hosts, nsid, spgid
public multisplit, tmphost
external multisplit_divide
objref subtrees
proc init() {local isid, nsub, isub
tmphost = -1 // prevent multiple pieces on one host for trial
host = -1
subtrees = new List()
gid = $o1.scanvar
spgid = gid
cx = $o1.scanvar
nsid = $o1.scanvar
for isid = 0, nsid-1 {
nsub = $o1.scanvar
for isub = 0, nsub-1 {
subtrees.append(new CellBalancePiece($o1, isid))
if (subtrees.count == 1) {
// printf("%d only has 1 piece. Do not split\n", gid)
nsid = 0
func pcx() {local i, c
c = 0
if (subtrees.count == 0 || $1 == 0) {
c = cx
}else for i = 0, subtrees.count-1 {
c += subtrees.object(i).cx
return c
func distinct_hosts() {local i, j, h
for i=0, subtrees.count-2 {
h = subtrees.object(i).host
for j=i+1, subtrees.count-1 {
if (h == subtrees.object(j).host) {
return 0
return 1
// $o1 NetCon with cell output source
// $2 is the binfo msgid.
// $o3 is the ParallelContext
// on entry the entire cell exists, on exit only the subtrees
// for this host. spgid will be gid for the subtree left containing
// the output port and otherwise will be derived from $2 and the subtree index
// The spgid will be registered for this machine and associated with the
// root of the subtree (but without it being an output port). The exception
// is that the subtree with the output port will have gid registered and
// be associated with that output port.
proc multisplit() {localobj srout, cell
$o1.preloc() srout = new SectionRef() pop_section()
cell = $o1.precell
// spgid = gid + $2*(subtree_index + 1) except the subtree
// with the output port has spgid = gid
msdiv(cell, srout, $2, $o3)
if (srout.exists()) {
$o3.cell(gid, $o1, 1)
//srout.sec printf("%d gid=%d for %s\n", $o3.id, gid, secname())
proc msdiv() {local i, sid, spgid, msgid \
localobj cell, srout, pc, sl, allsr, srlist, sidvec, sr, vsav, hosts, nil
cell = $o1
srout = $o2
msgid = $3
pc = $o4
allsr = new List()
sl = new SectionList()
// if top level cell use currently accessed section
if (object_id(cell) == 0) {
if (section_exists("soma", cell)) {
$o1.soma { sl.wholetree }
}else{ // at least there should be an all SectionList
i = 0
forsec cell.all {
if (i == 0) {
i += 1
forsec sl { allsr.append(new SectionRef()) }
vsav = new Vector(allsr.count)
for i=0, subtrees.count-1 {
for i=0, allsr.count-1 allsr.object(i).sec {
vsav.x[i] = v(.0001)
v(.0001) = -1
srlist = new List()
sidvec = new Vector()
hosts = new Vector()
sid = 0
for i=1, subtrees.count-1 {
sr = allsr.object(subtrees.object(i).idvec.x[0])
sr.parent if (v(.0001) == -1) {v(.0001) = sid sid += 1 }
for (i = subtrees.count-1; i > 0; i -= 1) {
sr = allsr.object(subtrees.object(i).idvec.x[0])
sr.parent { sidvec.append(gid*100 + v(.0001)) }
allsr.object(0).root srlist.append(new SectionRef())
if (0 && pc.id == 0) for i=0, sidvec.size-1 srlist.object(i).sec {
printf("%d %ld %d %s\n", i, sidvec.x[i], hosts.x[i], secname())
multisplit_divide(sidvec, srlist, hosts)
for i=0, allsr.count -1 {
sr = allsr.object(i)
if (sr.exists) sr.sec {
v(.0001) = vsav.x[i]
for i=0, srlist.count-1 {
sr = srlist.object(i)
if (sr.exists()) {
spgid = gid + msgid*(i + 1)
if (srout.exists()) srout.root if (sr.is_cas()) {
spgid = gid
pc.set_gid2node(spgid, pc.id)
if (spgid != gid) sr.sec {
pc.cell(spgid, new NetCon(&v(.5), nil), 0)
//printf("%d spgid=%d for %s\n", pc.id, spgid, secname())
} // else pc.cell will be called on outport after return
endtemplate CellBalanceInfo
begintemplate BalanceInfo
public bilist, cx, npiece, metis_weight, run_metis, metis_hosts, distinct_hosts
public nhost, stat, basename, write_balhost, msgid, write_hostcontext
public items, gids, sindices, mymetis, mymetis2, mybal
public base_gid, thishost_gid, gid2cx, write_colgid
objref bilist
strdef basename
objref items, gids, sindices // when only a few are read
proc init() {
msgid = -1
nhost = -1
ihost = -1
bilist = new List()
if (numarg() == 0) {
basename = $s1
if (numarg() == 1) {
}else if (numarg() == 3) {
read_host($2, $3)
proc read_all() {local i1, i2, n1, n2 localobj f, s
f = new File()
s = new String()
sprint(s.s, "%s.dat", basename)
n1 = f.scanvar
for i1=0, n1 - 1 {
n2 = f.scanvar
for i2=0, n2 -1 {
bilist.append(new CellBalanceInfo(f))
proc read_host() {local i, n, k, i1, i2, n1, n2 localobj f, s, cb
items = new Vector()
gids = new Vector()
sindices = new Vector()
f = new File()
s = new String()
sprint(s.s, "%s.%d.dat", basename, $2)
msgid = f.scanvar
nhost = f.scanvar
ihost = $1
for i=0, $1-1 {f.gets(s.s)}
if (f.scanvar != $1) { execerror("read_host() format error", "") }
n = f.scanvar // number of triples
for i=0, n-1 {
if (0) {
printf("msgid=%d nhost=%d ihost=%d\n", msgid, nhost, ihost)
print "items" items.printf
print "gids" gids.printf
print "sindices" sindices.printf
if (items.size == 0) { return }
i = 0
k = 0
sprint(s.s, "%s.dat", basename)
n1 = f.scanvar
for i1=0, n1 - 1 {
n2 = f.scanvar
for i2=0, n2 - 1 {
if (k == items.x[i]) {
cb = new CellBalanceInfo(f)
//printf("k=%d items.x[%d]=%d gid=%d %d\n", k, i, items.x[i], gids.x[i], \
while(1) {
if (gids.x[i] != cb.gid) {
execerror("gid inconsistency in read_host ", "")
cb.host = ihost
cb.subtrees.object(sindices.x[i]).host = ihost
i += 1
if ( i >= items.size) {
if (k != items.x[i]) { break }
s = new CellBalanceInfo(f) // skip
k += 1
proc metis_weight() {local i, j, c localobj cb, f, s
f = new File()
s = new String()
sprint(s.s, "%s.wt.dat", basename)
f.printf("%d 0 10\n", npiece())
for i = 0, bilist.count - 1 {
cb = bilist.object(i)
if (cb.subtrees.count == 0) {
f.printf("%g\n", cb.cx)
} else for j=0, cb.subtrees.count-1 {
f.printf("%g\n", cb.subtrees.object(j).cx)
proc run_metis() {localobj s
s = new String()
sprint(s.s, "pmetis %s.wt.dat %d", basename, $1)
proc metis_hosts() {local i, j localobj f, s, cb
s = new String()
f = new File()
sprint(s.s, "%s.wt.dat.part.%d", basename, $1)
if (f.ropen(s.s) == 0) {
for i = 0, bilist.count - 1 {
cb = bilist.object(i)
if (cb.subtrees.count == 0) {
cb.host = f.scanvar
}else for j=0, cb.subtrees.count - 1 {
cb.subtrees.object(j).host = f.scanvar
nhost = $1
// Metis seems to be quite order dependent. In fact ordering from greatest
// to least can improve things considerably. I wonder if we can do this
// "simple" problem just as well with a trivial algorithm
proc mymetis() {local i, j, k, ncpu, th localobj wt, cb, cpu, wcpu, icell
wt = new Vector()
icell = new Vector()
for i=0, bilist.count-1 {
cb = bilist.object(i)
if (cb.subtrees.count == 0) {
} else for j=0, cb.subtrees.count-1 {
cpu = new Vector(wt.size)
wcpu = new Vector($1)
ncpu = 1e9
// do some trials with increasing threshold til it fits into $1
for (th = wt.sum/$1; ncpu > $1; th += th/100) {
ncpu = trial(th, wt, cpu, wcpu, icell)
//printf("wt.sum=%g wcpu.sum=%g\n", wt.sum, wcpu.sum)
//printf("balance = %d %% ncpu=%d\n", (wcpu.max/(wt.sum/$1) - 1)*100, ncpu)
k = -1
for i=0, bilist.count-1 {
cb = bilist.object(i)
if (cb.subtrees.count == 0) {
cb.host = cpu.x[k += 1]
} else for j=0, cb.subtrees.count-1 {
cb.subtrees.object(j).host = cpu.x[k += 1]
nhost = $1
proc mymetis2() {local i, j, k, ncpu, th, nsrt \
localobj wt, cb, cpu, wcpu, icell, srt, wsrt, pcnt, cells, pindex
wt = new Vector()
icell = new Vector()
pindex = new Vector()
for i=0, bilist.count-1 {
cb = bilist.object(i)
cb.host = -1
if (cb.subtrees.count == 0) {
} else for j=0, cb.subtrees.count-1 {
cb.subtrees.object(j).host = -1
srt = wt.sortindex.reverse
printf("%d pieces size max=%g min=%g\n", wt.size, wt.x[srt.x[0]], wt.x[srt.x[wt.size-1]])
// always put the next piece in the least filled cpu
ncpu = $1
cpu = new Vector(wt.size)
wcpu = new Vector($1)
pcnt = new Vector($1)
for i=0, wt.size-1 { // i is cell index
j = wcpu.min_ind
//if j already has a piece from this cell, choose another
// No. allow multiple pieces on same cpu
if (0) j = distinct_piece(j, wcpu, bilist.object(icell.x[srt.x[i]]))
wcpu.x[j] += wt.x[srt.x[i]]
cpu.x[srt.x[i]] = j
pcnt.x[j] += 1
if (pindex.x[srt.x[i]] == -1) {
bilist.object(icell.x[srt.x[i]]).host = j
bilist.object(icell.x[srt.x[i]]).subtrees.object(pindex.x[srt.x[i]]).host = j
printf("ncell = %d nhost = %d\n", i, $1)
printf("max and min complexity %g %g avg = %g\n", wcpu.max, wcpu.min, wcpu.mean)
printf("load balance %g\n", wcpu.max/wcpu.mean)
printf("# max and min # cells on cpu %d %d\n", pcnt.max, pcnt.min)
k = -1
for i=0, bilist.count-1 {
cb = bilist.object(i)
if (cb.subtrees.count == 0) {
cb.host = cpu.x[k += 1]
} else for j=0, cb.subtrees.count-1 {
cb.subtrees.object(j).host = cpu.x[k += 1]
nhost = $1
func distinct_piece() {local i, j, savx localobj piece
j = $1
for i=0, $o3.subtrees.count-1 {
piece = $o3.subtrees.object(i)
if (piece.host == j) { // choose another
printf("not distinct on %d, choose another\n", $1)
savx = $o2.x[$1]
$o2.x[$1] = 1e9
j = distinct_piece($o2.min_ind, $o2, $o3)
$o2.x[$1] = savx
return j
return j
// read a weight file in metis format and do our own balance
// $s1 weight file name, $2 fits into that size nhost
proc mybal() { local i, ncpu, th localobj wt, cb, cpu, wcpu, f
execerror("no longer works. implement icell", "")
f = new File()
wt = new Vector()
wt.resize(f.scanvar()) f.scanvar() f.scanvar()
for i = 0, wt.size-1 {
wt.x[i] = f.scanvar()
cpu = new Vector(wt.size)
wcpu = new Vector($2)
ncpu = 1e9
// do some trials with increasing threshold til it fits into $2
for (th = wt.max; ncpu > $2; th += th/100) {
ncpu = trial(th, wt, cpu, wcpu)
printf("wt.sum=%g wcpu.sum=%g\n", wt.sum, wcpu.sum)
printf("balance = %.2f ncpu=%d\n", wcpu.max/(wt.sum/$2), ncpu)
// return number of cpus filled based on first threshold arg
// $o2 is the sorted wt vector
// $o3 is returned as the vector of cpu#
// $o4 is returned as the weight on each cpu
// $o5 is parallel to wt and gives the index into bilist
// for purposes of avoiding more than one cell piece on a cpu
func trial() {local iwt, icpu, w, wcpu, i \
localobj tobj, filled, wt, srt, cpu, cand, wcand, icell
th = $1
wt = $o2.c
srt = wt.sortindex.reverse
cpu = $o3
icell = $o5
for i=0, bilist.count-1 { bilist.object(i).tmphost = -1 }
// minimum threshold is the largest weight
if (th < wt.x[srt.x[0]]) { th = wt.x[srt.x[0]] }
filled = wt.c.fill(0)
icpu = 0
iwt = 0
wcpu = 0 // weight on icpu
while (iwt < wt.size) {
if (filled.x[srt.x[iwt]]) { iwt += 1 continue }
w = wt.x[srt.x[iwt]]
if (trial_large(wcpu + w <= th, icpu, bilist.object(icell.x[srt.x[iwt]]).tmphost)) {
//printf("fill iwt=%d wcpu=%g w=%g icpu=%d\n", iwt, wcpu, w, icpu)
tobj = bilist.object(icell.x[srt.x[iwt]])
if (tobj.tmphost == icpu) {
printf("can fill problem with %d\n", icell.x[srt.x[iwt]])
tobj.tmphost = icpu
wcpu += w
filled.x[srt.x[iwt]] = 1
wt.x[srt.x[iwt]] = 1e9 // do not use again
cpu.x[srt.x[iwt]] = icpu
iwt += 1
}else{ // does not fit
// is a smaller one available
cand = wt.c.indvwhere("<=", th - wcpu)
if (trial_small(wt, cand, icpu, &i, tobj, icell)) {
tobj = bilist.object(icell.x[i])
if (tobj.tmphost == icpu) {
printf("smaller problem with %d\n", icell.x[i])
tobj.tmphost = icpu
//printf("max i=%d size=%d wcpu=%g max=%g\n", i, cand.size, wcpu, wcand.max)
wcpu += wt.x[i]
wt.x[i] = 1e9
cpu.x[i] = icpu
filled.x[i] = 1
// guess that cpu is filled as much as feasible
//printf("filled iwt=%d wcpu=%g icpu=%d\n", iwt, wcpu, icpu)
if(wcpu == 0) { execerror("error in trial", "") }
wcpu = 0
icpu += 1
// may be a last one that did not get into wcpu
if (wcpu) {
icpu += 1
// printf("trial $1=%g th=%g ncpu=%d\n", $1, th, icpu)
return icpu
func trial_large() {
if ($1 == 0) { return 0 }
if ($2 == $3) {
// printf("trial_large problem fixed\n")
return 0
return 1
func trial_small() {local i, icpu localobj wt, cand, tobj, wcand, icell
wt = $o1
cand = $o2
icpu = $3
tobj = $o5
icell = $o6
while (cand.size) {
wcand = cand.c.index(wt, cand)
i = cand.x[wcand.max_ind]
tobj = bilist.object(icell.x[i])
if (tobj.tmphost == icpu) {
// printf("trial_small problem with %d, fixed\n", icell.x[i])
$&4 = i
return 1
return 0
proc distinct_hosts() { local i
for i=0, bilist.count-1 {
if (bilist.object(i).distinct_hosts == 0) {
printf("hosts for bilist[%d] not distinct\n", i)
func cx() {local i, c, j
j = 0
if (numarg() == 1) {
j = $1
c = 0
for i = 0, bilist.count-1 {
c += bilist.object(i).pcx(j)
return c
func npiece() {local i, n
n = 0
for i=0, bilist.count-1 {
if ( bilist.object(i).subtrees.count == 0) {
n += 1
n += bilist.object(i).subtrees.count
return n
proc stat() {local i, j, c, cmax, mcp, mxs, mcpi, mxsi localobj cxvec, npvec, cb, p
if (nhost == -1) return
mcp = 1
mxs = 0
cxvec = new Vector(nhost)
npvec = new Vector(nhost)
for i=0, bilist.count-1 {
cb = bilist.object(i)
if (cb.nsid > mxs) {
mxs = cb.nsid
mxsi = i
if (cb.subtrees.count > mcp) {
mcp = cb.subtrees.count
mcpi = i
if (cb.subtrees.count == 0) {
cxvec.x[cb.host] += cb.cx
npvec.x[cb.host] += 1
}else for j=0, cb.subtrees.count-1 {
p = cb.subtrees.object(j)
cxvec.x[p.host] += p.cx
npvec.x[p.host] += 1
c = cx()
cmax = cxvec.max
printf("total complexity is %g\n", c)
printf("%d cells\n", bilist.count)
printf("%d pieces\n", npiece())
printf("maximum complexity is %g for host %g\n", cmax, cxvec.max_ind)
printf("load imbalance is %.0f%%\n", 100*cmax*nhost/c - 100)
printf("maximum of %d pieces on host %g\n", npvec.max, npvec.max_ind)
if (mcp > 1) {
printf("at least one cell is broken into %d pieces (bilist[%d], gid %d)\n", mcp, mcpi, bilist.object(mcpi).gid)
printf("at least one cell has %d sids (bilist[%d], gid %d)\n", mxs, mxsi, bilist.object(mxsi).gid)
printf("no broken cells\n")
// write a host file with the format
// max_gid
// nhost
// ihost ngid for that host followed by a list of (index, gid, subtreeindex) triples
// the indices will be in order and the gid is just for error checking
// the idea is to allow the parallel run to not have to save all the info
proc write_balhost() {local i, j, mg localobj hostgids, cb, p, f, s
if (nhost == -1) return
hostgids = new List()
for i=0, nhost-1 { hostgids.append(new Vector()) }
mg = -1
for i=0, bilist.count-1 {
cb = bilist.object(i)
if (cb.gid > mg) {
mg = cb.gid
for (msgid=10; msgid <= mg; msgid *= 10) {}// encode gid in spgid along with subtree index
if (msgid < 1e7) {
msgid = 1e7 // there may be unused gids in BlueBrain
for i=0, bilist.count-1 {
cb = bilist.object(i)
if (cb.subtrees.count == 0) {
}else for j=0, cb.subtrees.count-1 {
p = cb.subtrees.object(j)
s = new String()
sprint(s.s, "%s.%d.dat", basename, nhost)
f = new File()
f.printf("msgid %d\n", msgid)
f.printf("nhost %d\n", hostgids.count)
for i=0, hostgids.count-1 {
p = hostgids.object(i)
f.printf("%d %d", i, p.size/2)
for (j = 0; j < p.size; j += 2) {
f.printf(" %d %d %d", p.x[j], bilist.object(p.x[j]).gid, p.x[j+1])
// derived from write_balhost but output format is suitable for
// the fast_create.hoc file
proc write_colgid() {local i, j, mg localobj hostgids, cb, p, f, s
if (nhost == -1) return
hostgids = new List()
for i=0, nhost-1 { hostgids.append(new Vector()) }
mg = -1
for i=0, bilist.count-1 {
cb = bilist.object(i)
if (cb.gid > mg) {
mg = cb.gid
for (msgid=10; msgid <= mg; msgid *= 10) {}// encode gid in spgid along with subtree index
if (msgid < 1e7) {
msgid = 1e7 // there may be unused gids in BlueBrain
for i=0, bilist.count-1 {
cb = bilist.object(i)
if (cb.subtrees.count == 0) {
}else for j=0, cb.subtrees.count-1 {
p = cb.subtrees.object(j)
s = new String()
sprint(s.s, "%s.%d.dat", basename, nhost)
f = new File()
f.printf("ncell %d\nnhost %d\n", bilist.count, hostgids.count)
for i=0, hostgids.count-1 {
p = hostgids.object(i)
for (j = 0; j < p.size; j += 2) {
f.printf("%d %d\n", bilist.object(p.x[j]).gid, i)
// just enough basename.ncpu.dat to get complete host communication for
// specified host arg. 2nd arg is the nhost for the original basename.nhost.dat
// file
// $3 is the level
proc write_hostcontext() {local i, j, il, ngid, gid, host \
localobj f, s, cb, hostgids, gvec, mark, marked, gidvec, hostvec, gi
//first, read the complete basename.$2.dat file
f = new File()
s = new String()
sprint(s.s, "%s.%d.dat", basename, $2)
msgid = f.scanvar
nhost = f.scanvar
gidvec = new Vector()
hostvec = new Vector()
hostgids = new List()
mark = new Vector(nhost)
for i=0, nhost-1 {
if (i != f.scanvar) { execerror("bad format for ", s.s) }
gvec = new Vector()
ngid = f.scanvar
for j=0, ngid-1 {
gid = f.scanvar
// now mark the hosts we need, ie. every host that has a gid
// referred to by $1
mark.x[$1] = 1
gvec = hostgids.object($1)
domark(gvec, gidvec, hostvec, mark)
for il=1, $3-1 {
// try the second level
gi = mark.c.indvwhere("==", 1)
for i=0, gi.size-1 {
domark(hostgids.object(gi.x[i]), gidvec, hostvec, mark)
// print the reduced basename.ncpu.dat file
marked = mark.c.indvwhere("==", 1)
sprint(s.s, "%s.%d.dat", basename, marked.size)
f.printf("msgid %d\n", msgid)
f.printf("nhost %d\n", marked.size)
for i=0, marked.size-1 {
j = marked.x[i]
gvec = hostgids.object(j)
f.printf("%d %d", i, gvec.size/3)
// f.printf("%d %d", marked.x[i], gvec.size/3)
for (j=0; j < gvec.size; j += 3) {
f.printf(" %d %d %d", gvec.x[j], gvec.x[j+1], gvec.x[j+2])
print "wrote ", s.s
proc domark() {local i, j, gid localobj gvec, gidvec, hostvec, mark, gi
gvec = $o1 gidvec = $o2 hostvec=$o3 mark=$o4
for (i=0; i < gvec.size-1; i += 3) {
gid = gvec.x[i+1]
gi = gidvec.c.indvwhere("==", gid)
for j=0, gi.size-1 {
mark.x[hostvec.x[gi.x[j]]] = 1
func base_gid() {
return $1 % msgid
func thishost_gid() {local i
if ($1 >= msgid) { return $1 }
i = gids.indwhere("==", $1)
if (i < 0) { return -1 }
return bilist.object(i).spgid
func gid2cx() {local i localobj cb
for i=0, bilist.count-1 {
cb = bilist.object(i)
if (cb.spgid == $1) {
return cb.cx
return 0
endtemplate BalanceInfo
proc mkbalinfo() { localobj bi
bi = new BalanceInfo($s1)
if (0) {
proc mkcolgid() { localobj bi
bi = new BalanceInfo($s1)
proc mymetis2() { localobj bi
bi = new BalanceInfo($s1)
proc mymetis3() { localobj bi
bi = new BalanceInfo($s1)
objref bi
if (0) {
bi = new BalanceInfo("balinfo")
print "total cell complexity = ", bi.cx()
print "total piece complexity = ", bi.cx(1)
print bi.npiece(), " pieces"
print bi.bilist.count, " cells"
if (0) {
objref bi
bi = new BalanceInfo("balinfo", 21, 1024)
if (0) {
objref bi
bi = new BalanceInfo("balinfo")
bi.write_hostcontext(860, 1024, 5)
if (0) {
bi = new BalanceInfo("balbb520")
if (0) {
bi = new BalanceInfo("bi")
if (0) {
bi = new BalanceInfo("cx")