DP: A library for building portable,
reliable distributed applications

David Arnow,
Department of Computer and Information Science
Brooklyn College City University of New York
Brooklyn, New York 11210

e-mail: arnow@sci.brooklyn.cuny.edu
ABSTRACT: DP is a library of process management and communication tools for writing portable, reliable distributed applications. It provides support for a flexible set of message operations as well as process creation and management. It has been successfully used in developing distributed Monte Carlo, disjunctive programming and integer goal programming codes.It differs from PVM and similar libraries in its support for lightweight, unreliable messages, as well as asynchronous delivery of interrupt-generating messages. In addition, DP supports the development of long-running distributed applications tolerant to the failure or loss of a subset of its processors.


Publication Information: This Brooklyn College Technical Report was presented at the Winter USENIX conference in New Orleans in 1995 and appeared in the conference proceedings: DP - a library for building reliable, portable distributed programming systems. Proceedings of the USENIX Winter '95 Technical Conference. New Orleans, Jan. 1995.

Acknowledgment of Support: This work was in part supported by PSC-CUNY Grant number 6-63278, PSC-CUNY Grant number 6-64184.

(1) Distributed Programming Tools

Although the increase in diversity and availability of parallel multiprocessors shows no sign of abatement, the one truly ubiquitous parallel computer system continues to be the LAN of workstations and the one massively parallel system to which "everyone" has access, if not authorization, is the Internet. Recognition of network computing as an important platform for parallel computing and the desirability of high-level and portable programming systems has resulted in the widespread development of a host of message-passing based programming environments.

Initially, much of this effort went into the design of programming languages or language extensions. An extensive review of these is given by Bal [Bal89]. Each language, besides providing a higher semantic level and portability expresses a view as to how a distributed program ought to be conceived. These views may be limiting. For example, many languages (e.g. SR [Andrews82]) are strongly influenced by the semantic restrictions (synchronous message passing) advocated by Hoare [Hoare78]. Other languages adopt an asynchronous message semantics (for example, NIL [Strom83]). Still others hide message passing altogether, and present a paradigm different from that of a distributed system. Most notable among these are parallel logic programming languages such as PARLOG [Clark88], the shared memory model of LINDA [Gelernter85], or more recently Concert/C [Aurebach92; Goldberg93].

Languages for distributed systems are necessarily designed with a particular paradigm in mind and as such must impose some restrictions in order to maintain the integrity of that paradigm. The portability of their implementation is not trivial. Perhaps even most significantly, new languages require a substantial reinvestment on the part of users. Therefore, as the locus of interest in parallel network computing changed from language designers to users, there has been a shift to the design of libraries of standard routines or of environments consisting of supporting processes as well as libraries. While the programmer is no longer protected by a language, a greater flexibility and portability can be achieved.

One such environment is PVM [Sunderam90; Geist92], which is implemented on a variety of Unix systems and enjoys extensive use by computational scientists. Others include NMP [Marsland91] and P4 [Butler92]. The performance of these systems and others has recently been reviewed in two papers [Douglas93 and Parsons94]. In response to both the proliferation of such environments and the use of PVM as a de facto standard, the past two years have seen an effort to develop a standard for these environments, MPI [MPI93].

All of these environments provide varying degrees of flexibility, portability and scalability, with PVM providing the most. However, none of them offer the flexibility that my applications required. Furthermore, none offer the kind of reliability that is necessary for conveniently scaling up to long computations involving many workstations.

(2) Wanted: Portability, Flexibility and Reliability

Portability. DP was developed as a result of my own experiences writing distributed programs that ran on LANs and the Internet itself from 1988-1991. These programs included Monte Carlo and other scientific calculations as well as operations research programs. Writing the process management and communication code directly in the native system primitives was maddeningly non-portable even though all the systems involved were either some flavor of Unix or inspired by Unix. The programs were parallelizations of large existing codes, and the necessary interprocess communication was embedded deeply so rewriting these programs in a different language was out of the question.

Flexibility. Most frustrating was the loss of flexibility (with respect to use of the native system primitives) that results from the use of any of the other distributed languages or programming environments available then and now. The communication facilities available in these systems (and described in the proposed MPI standard) do not support interrupting messages. Thus, a process receiving a message must invoke a receive operation explicitly at each point that a message is sought. The receive operation is typically allowed to be blocking or non-blocking, so both barriers and polling are readily available to the programmer.

In situations where a process cannot proceed at all until data from an incoming message has been received, these message semantics pose no problem- it is entirely natural to explicitly encode message receive operations just prior to the use of the needed data in the program and entirely proper for these operations to be blocking.

However, there are situations, for example in the Monte Carlo and in disjunctive programming applications in which the I was interested, where:
(a) incoming data serves "merely" to increase the efficiency of the processes computation
and
(b) it is not certain that the incoming data will arrive at all!
In these situations, it is both unnatural and extremely inefficient to explicitly encode non-blocking receive operations in the process's application code. In some cases, the problem is mitigated by the availability of threads. A dedicated input thread can integrate the contents of an incoming message into the process's data objects without the need for polling. However, it may be that the only way to efficiently respond to the new information is for the main thread to make an abrupt change in its control, i.e. to make an sudden jump out of its current nested routine stack. In the absence of an inter-thread signaling facility, there is no way for the main thread to recognize the need for this short of testing an object in its own address space- cheaper than executing non-blocking receives, but still inefficient.

Another loss of flexibility is the inability to send fast UDP-style messages in situations where message unreliability may not be a serious drawback. Operating system services often have this characteristic, but, surprisingly perhaps, so do some applications. Consider, for example, a large Monte Carlo calculation involving thousands of random walks. It is often the case that if a small fraction of these are, at random, lost (as a result of message loss), then the impact is "only" an increase in variance. If as a result of permitting such losses, the computation can run faster and hence have a greater sample size, then the benefit could outweigh the loss.

Reliability- for the sake of scalability. Although reliability was decidedly not an initial concern of this project, the project's own success forced the issue. The applications that most readily make use of DP are those which have a high computational cost and which are parallelizable. But by running a parallelized application over a great number of workstations on a LAN for a long time, the likelihood of zero workstation reboots during the course of a single computation began to become uncomfortably low. In order to more fully realize the potential for the exploitation of networks and internetworks of workstations, greater reliability is essential. So, as the project developed, increased reliability (with respect to single workstation failure) became a goal.

In summary, the DP library was designed with the following goals:
  1. Flexibility and power: The primitives must provide the power to perform most distributed programming functions. The application programmer should not lose any functionality or efficiency by using DP instead of the native system primitives.
  2. Portability: The primitives should be implementable on most, if not all, distributed computing platforms.
  3. Reliability: the loss due to external circumstances of one or all processes on a single workstation should have no impact on the outcome of a distributed computation other than a short delay, provided that the processes involved do not conduct any i/o other than message sending and receiving.
With the exception of not providing a broadcast or multicast message facility, DP meets these goals. On the other hand, in comparison to other distributed programming environments and languages, DP provides a very low-level application interface. There is no typing of messages, minimal support of data format conversions, no queuing of synchronous messages, and no concept of conditional receives. There is, however, a higher-level distributed programming support environments, stdDP [Arnow94] that provides those services and is implemented using DP.

(2.1) A sketch of a motivating application: capacitiated warehouse location

To clarify the kind of problem that demands the interrupting message facility that is absent from other environments, we present one example: the capacitated warehouse location problem- a classic operations research problem.

The goal is to supply the needs of customers from a group of proposed warehouses and to minimize monthly cost. Using a warehouse requires a fixed monthly charge and there is the cost of supplying all or part of a customer's requirements from a given warehouse. The problem is to determine which warehouses to open so as to minimize cost while meeting demand. Although the problem is NP-hard, good results can be achieved using Branch-and-Cut and Branch-and-Bound techniques. Worker processes are given portions of the search tree to explore and communicate intermediate results to another. Idle worker processes are given new tasks by master processes which, must obtain these from busy worker processes. Worker processes can complete their tasks significantly more rapidly through pruning by "knowing" the current global minimal cost.

Both of these operations- obtaining the new global minimum and paring the current subtree to define and transmit a new task- are in response to arriving messages, the number of which is unknown. Polling, besides being inefficient requires an inordinate modification of the original code. In both of these cases, efficiency and convenience is served by providing interrupting messages. This application and other related ones are described in [Arnow91, Arnow94, Arnow95].

(3) DP's services

Although of the same genus as environments such as PVM and P4, DP differs from each of them in a number of important ways, primarily because of the above goals. Functionally, the most important difference is its provision for unsolicited messages whose arrival generate a software interrupt. This provides a flexible method of sending large quantities of urgent information that cannot be easily accomplished with, say, the unix-signal transmission of PVM. It also allows a programming environment to provide a shared-memory like capability. DP permits messages to be sent in the cheapest way possible, when reliable transmission is not necessary. Messages can be received with or without blocking. Process creation is dynamic and limited only by available computing resources. Furthermore, DP can guarantee the reliability of those processes that engage in no i/o other than message sending and receiving in the event of a single workstation failure. This section presents an overview of most of the services provided by DP.

(3.1) Process management

Execution. Execution of a DP application starts by invoking a suitably compiled and linked DP executable program. In DP parlance, this process is called the primary, though its primacy is for the most part just a matter of being first- there is nothing very special about the primary. The primary process and its descendants (those process that are spawned by it or its descendants) constitute a DP process group. DP processes can only communicate within this group.

Identification. Each DP process is identified by a value of type DPID, guaranteed to be unique among all possible DP processes. The function dpgetpid() returns the current process's id via a storeback parameter. Processes learn the DPIDs of other processes either by being their parent, by receiving a message from them, or when the DPID of a process is in the contents of a message and is used by the receiving process as such.

The hosts file. In order to spawn processes, a DP program must have information about the available hosts for process creation. DP processes can acquire that information dynamically but it is usually convenient to provide that information to the primary via a hosts file in the directory from which the DP application is executed. The primary will automatically read this file and prompt the user for the passwords needed to access the networks named. This information is inherited by spawned processes.

The hosts table. Host information is maintained internally in a hosts table. In DP application code, hosts are identified using integer indices to this table. The table is dynamic- new hosts can be introduced during run-time by the function dpaddhost(). This call is an alternative and a supplement to providing host information through a static, though convenient, hosts file.

Process creation. Process are spawned by calling dpspawn(), passing The call to dpspawn() returns the DPID of the new process via a storeback parameter. The entry point for the newly spawned secondary processes is main(), not the instruction after the call to dpspawn(). These secondary processes do not have access to the original command-line arguments nor do they inherit a copy of the creator's address space- their data must come from the semantic packet or from subsequent received messages. Guiding dpspawn is the entry in the internal host table for the given host id. That entry, among other things, determines the user name under which the new process will run and most significantly the directory in which the program must be found and in which the new process will start executing

Initialization. The first DP call any DP program makes should be dpinit(), which sets up the necessary process and communication environment. This includes initialization of DP's data structures and establishing an address. If the process calling dpinit() was created by dpspawn() the caller is given access to the semantic packet, described above. A pointer to this packet is returned via storeback parameter to dpinit(). The size of the packet is also stored back. In the case of a primary process, there is no semantic packet and the size stored back is ­p;1: that is how the code can determine whether it is running in the primary process or a secondary after a call to dpinit().

The call to dpinit() completes the handshaking with the creating parent process. The creating process cannot continue its work until the created process makes this call. For this reason, the call to dpinit() should be made as soon as possible. Upon returning from dpinit(), the process is a genuine DP process and can partake in the activities of the DP family.
In all cases, dpinit() returns the number of host machines in its inherited internal host table and stores back the host id of the machine on which the process is running.

Joining a DP process group. Any non-DP process may join an existing DP process group. For this to be possible, one or more of the processes in the group must invoke dpinvite(). This call creates a contact file, which contains all the information that a new process would normally get from dpspawn(). All the joining process need do is invoke dpjoin() with the pathname of the contact file as an argument. This call plays the role of dpinit() and establishes communication using information provided in the contact file. The newly joined DP process's identity can then be conveyed to any process in the group. Note that this mechanism requires that the joining process and the inviting process must share some file address space in common.

Finishing Up. All DP processes must call dpexit() to make a graceful exit. The dpexit() function is the DP substitute for Unix exit() call; that is, it makes a no-return exit. If a DP process fails to exit using dpexit(), i.e. if it exits using the Unix exit(), other DP processes in the application may fail. The main purpose of dpexit() is to withdraw the exiting process from contact with the remaining DP processes prior to an actual exit in a way that guarantees correct message transmission. The only argument to the function is a string identifying the reason for termination. The string appears only in the log file for the process and may be null.

Sometimes, it may be desirable for a process to cease DP activity but persist in some other activity. By passing the address of a function to dpsetexfun() any time prior to calling dpexit(), a process guarantees that dpexit(), after withdrawing from the group of DP processes, will call the indicated function prior to doing the actual Unix exit.
Bailing out. The dpexit() call terminates one DP process in the group. Generally, each process's own logic dictates when that termination is appropriate. In exceptional circumstances, it may be necessary to allow a single process in the group to force termination in the entire group. In such a case, dpstop() can be called. The dpstop() call force immediate shutdown of all processes. The function set by dpsetexfun() is not called and the ensuing shutdown is so radical that even earlier messages that had been sent but were not yet delivered may be thrown away. The function receives one argument, a string, which has the same meaning as the string passed to dpexit().

(3.2) Communication

Sending messages. DP processes communicate by sending and receiving messages. For sending messages, the dpwrite() routine requires the DPID of the recipient and a pointer to the start of the message body along with the message body size. A variant, dpsend(), allows a message body to be specified as a linked list. Messages can be reliable or non-reliable and interrupting or non-interrupting. Reliability here means that DP, which as I describe in section 4 uses UDP as its underlying protocol, will carry out an ack/timeout/retransmit protocol that will guarantee the eventual availability of the message to the target provided that the underlying network and relevant host machines do not fail. Reliable messages are received in the order in which they were sent. Sending the message unreliably means that DP will send the message to the target only once and assume no further responsibility- a much cheaper method of message transmission.

Regardless of whether the message is sent reliably, return to the sender is immediate; the sending process will not be blocked during this time. So upon return from dpwrite(), one thing is certain: the target has not yet received the message.

Receiving messages. Logically, each DP process has two receiving ports: one for receiving interrupting messages and another for receiving non-interrupting messages. Non-interrupting messages are queued upon arrival and do not affect the receiving process until it explicitly reads the message with the dprecv() call. In the case of the interrupting message, the message's arrival may force the invocation of a special message-catching routine if such a routine has been designated by the receiving process via a call to dpcatchmsg(). Whether or not such a routine has been designated, the interrupting message must be read explicitly with the dpgetmsg() call, not the dprecv() call. Both routines return the DPID of the sender as well as the message itself and both routines move the incoming message from an internal DP buffer to an application-provided buffer. If the latter is insufficient to hold the message, the message is truncated. The dprecv() call can be made with or without blocking semantics, but the dpgetmsg() call, because it is typically used inside an interrupt handler where blocking would be inappropriate never blocks. In the event that several interrupting messages arrive before the system has had a chance to invoke the message handler function, only one call to the message handler will be made, i.e., there is not a one-to-one correspondence between interrupting messages and calls to the handler. Hence, the message handler must be assume that there may be more than one interrupting message ready to be received.

Longjumps. Sometimes when the message-catching routine is invoked, it responds to the incoming information by modifying a global data structure or sending out a message with requested information. At other times, however, it must respond by making an exceptional change in the control flow of the receiving process. The dplongjmp() routine provides that capability. It works exactly as longjmp() does and in fact its argument is a jmpbuf that was set by setjmp() (there is no "dpsetjmp"). The only reason for dplongjmp() (instead of the standard longjmp()) is that the jump out of the message handler must be accompanied by a re-enabling of interrupting messages.

(3.3) Synchronization and timeouts

Critical sections. The application-specified message-catching routine may be invoked at any time and may reference global objects. Thus, any other code that accesses these global objects is a one-way critical section, in the sense that though, upon receipt of an interrupting message control may transfer from the critical section to the handler, the reverse is not possible: control will not pass from the handler until it has completed its work and returns. To guarantee mutual exclusion, such access should be preceded by a call to dpblock() to disable calls to the interrupt handler and followed by a call to dpunblock() to re-enable them. Upon invoking dpunblock(), if any interrupting messages arrived since the call to dpblock(), the catching function will be invoked.

Synchronization and Timeouts. Sometimes a process needs to wait until some condition becomes true, typically as a result of incoming interrupting messages. The dppause() call suspends execution of the process until any asynchronous event takes place. The application may set a timer and a timeout function through non-zero arguments to this call. Upon entering dppause(), interrupting messages (and calls to the message catcher) are enabled and status is restored upon return. Typical use of this function is
dpblock();
while (!some_desired_condition)
		dppause(0, (FUNCPTR) 0)
dpunblock();
The intent of this code is not to proceed until some_desired_condition, which presumably depends on the arrival of a message, is true. Rather than busy-wait, the program calls dppause() which will not return until some event, possibly a message arrival, has taken place. Because many events are possible, the desired condition has to be rechecked and dppause() reentered if necessary. The window between the checking of the desired condition and entry into dppause() open the possibility for a race condition and so the loop is enclosed by calls to dpblock() and dpunblock().

(3.4) Restrictions and Application Front Ends

Except for processes that use dpjoin() to join a DP process group, standard input/output/error are not available to the DP application. Thus dpjoin() is essential if interactive programs are desired. Message-catching functions may not call dprecv() in blocking mode.

Timing. All systems calls and standard subroutines that are implemented using the Unix alarm system call (or its variants) are not allowed because they would interfere with DP's own reliance on this facility. That includes: sleep, alarm, ualarm. To restore some of this functionality to the application writer, there is a special DP routine, dpalarm(t,f) which arranges for function f to be invoked after t milliseconds.

Asynchronous and signal-driven i/o. Using the BSD select() system call or making use of the SIGIO signal is forbidden.

Exec and fork. Use of any of the exec variants is forbidden, unless used in conjunction with fork() or after dpexit() has been called. The fork() system call can be used provided that the children do not attempt to partake in the execution of DP routines. Child processes (but not the parent) may do execs.

Application front ends. These restrictions might initially seem daunting to the application writer. However, it is always possible for non-DP processes, such as one intended to support an event-driven user interface front end, to fork a child process which uses dpjoin() to become a DP process or even which uses dpinit() to become a DP primary process. The non-DP parent and the DP child can communicate using pipes or SysV IPC.

(4) Examples

A simple example: primes. The primes program, shown below, illustrates the use of the DP interface. The primary process uses dpcatchmsg() to arrange for fcatch() to be invoked in the event of an interrupting message and then spawns two processes for every available host, sending a semantic packet containing just the DPID of the primary to each secondary process. It divides the interval 1..100000 equally among all the processes, including itself and then uses dpwrite() to send the lower bound of each subinterval to each process in a reliable, non-interrupting message (DPREL|DPRECV). The primary then searches for primes in its own subinterval.

Meanwhile, the secondary processes have started and, having received their lower bound by calling dprecv(), they too start searching for primes in their own subintervals. Both secondary and primary processes invoke newprime() when a prime number is found. For the primary, newprime just adds the prime to the set of primes- this is a critical section because an interruping message may access the same set and so must be protected with dpblock() and dpunblock(). For the secondaries, dpwrite() is used to send the prime number in a reliable, interrupting message (DPREL|DPGETMSG). The arrival of these message cause fcatch() to be invoked, and the incoming prime number to be stored in the set of primes.

To let the primary know that no more primes are forthcoming, secondaries send a negative integer in a reliable interrupting message and then exit. The primary waits till it has received the appropriate number of such messages and then exits.

#include		<stdio.h>
#include		<dp/dp.h>

struct semstr {				/* most programs would have				*/
	DPID	s_id;				/*  other fields here as well			*/
} s, *sp;

#define		MAXPRIMES		100000
int	p[MAXPRIMES], np=0, IsPrimary,
	nprocs, nhosts, done=0,
	interval, myhostid;

#define	RelInt	(DPREL|DPGETMSG)
#define	RelNonInt	(DPREL|DPRECV)

void
sendint(DPID *dest, int i, int mode) {
	dpwrite(dest, &i, sizeof(i), mode);
}

void
newprime(int n) {
	if (IsPrimary) {
		dpblock();					/* potential race condition		*/
		p[np++] = n;				/*    so block interrupts		*/
		dpunblock();
	} else
		sendint(sp->s_id, n, RelInt);
}

void
fcatch() {
	int	v;
	DPID	src;

	while (dpgetmsg(&src,&v,sizeof(p)
				!=DPNOMESSAGE)
		if (v<0)
			done++;
		else
			p[np++] = v;
}

void
search(int n1,int n2)
	int	i;

	for (i=n1; i<=n2; i++)
		if (IsPrime(i))
			newprime(i);
}

void
primary(char *prog) {
	int	i=1, v=0;
	DPID	child;
	FILE	*fp;

	dpcatchmsg(fcatch);
	dpgetpid(&s.s_id);
	while (i<nprocs) {
		dpspawn(prog, &child, i%nhosts,
					&s, sizeof(s));
		sendint(&child, v, RelNonInt);
		1+=interval
		i++;
	}
	search(v,MAXPRIMES);
	done++;

	dpblock();					/* potential race condition: so	*/
	while (done<nprocs)		/*  block interrupts					*/
		dppause(0L, NULLFUNC);
	dpunblock();
								/* write primes to results			*/
	output("results", p, np	);	
	dpexit("You're fired!");
}

void
secondary() {
	int	v;
	DPID	src;

	dprecv(&src, &v, sizeof(v), DPBLOCK);
	search(v,v+interval);
	sendint(sp->s_id, -, RelInt);
	dpexit("I quit!");
}
							/* main: executed by all processes		*/
main(int ac, char *av[]) {	
	nhosts = dpinit(av[0], &sp,
					&size, &myhostid);
	IsPrimary = size==(-1);
	nprocs = 2*nhosts;		/* 2 processes per host				*/
	interval = MAXPRIMES/(nprocs+1);
	IsPrimary? primary() : secondary();
}

Capacitated warehouse location problem, again. A branch-and-bound search for solutions can be efficently parallelized using DP. N processes are created, N being determined by available hardware. The primary maintains a set of unsearched subtrees- initially this set is take from the top N subtrees of the search tree. When a secondary process becomes idle, it sends an reliable interrupting message to the primary requesting a subtree and then waits until it recieves one. When the primary's set of unsearched subtrees falls below a low-water mark, it sends reliable interrupting messages to all the active secondaries, requesting that they split their subtree at the next convenient point. These will continue to split their subtrees, sending (in reliable interrupting messages) the split-off branches to the primary to replenish its set, until the primary, having passed a high-water mark, sends them reliable interrupting messages to desist. This is very effective load-balancing. The availability of interrupting messages here is essential because of the unpredicatibility of need and availability of search subtrees on the one hand and the undesirability of frequent polling on the other.

An important element of the branch-and-bound search algorithm is the ability to prune search subtrees when the best extremum the subtree can offer is inferior to the best extremum already encountered. In a shared memory environment, all processes have memory access to the best extremum but in a message-passing network environment making sure this information is rapidly available to all processes is both necessary and non-trivial. Using DP, this problem is efficiently addressed as follows. Whenever a secondary discovers what appears to be a new best extremum it sends a reliable interrupting message to the primary, which multicasts this in lightweight (unreliable) messages to all the other secondaries. Making these message interrupting guarantees that the information will become available to the receiving process as quickly as the underlying system permits. Using lightweight messages ensures that the multicasts will not overload the system nor overbuden the primary. The cost of occasionally losing such a message is minor: it simply means that occasionally for some, usually small, duration, a secondary may not be pruning its subtrees as effectively as it would otherwise.

Implementation

DP is implemented using the socket system call interface to the TCP and UDP services of the TCP/IP protocol suite, basic Internet services such as ping and rexec and, of course, a host of Unix services. Once the basic implementation issues were decided all of these services were used in the obvious way.

Communication mechanism. The first issue to be decided was how inter-process communication is to be handled. PVM and many other environments use TCP. This is a very attractive choice given that much inter-process communication has to be reliable and that TCP handles this within the OS kernel. Building a reliable service using UDP requires duplicating much of this outside the kernel with all the context-switching cost that this implies. Nevertheless, DP's inter-process communication is almost entirely implemented using UDP. The reasons for this are: Using UDP does require DP to guarantee reliable sequenced delivery of those messages that require this service. In the current implementation, reliable messages are implemented in the most naive way (with sequence numbers, positive acks, timeouts, retransmits and a notion of "stale" messages). More sophisticated implementations are certainly possible and in the still "gray" part of the DP interface, there are calls that allow the programmer to adjust protocol parameters such as timeout size.

Process identification. It would have been desirable for DPIDs to be integers or some other basic C type. However, that requires some kind of id-to-address mapping internal to DP. A problem arises when an application DP process references a DPID for which its own DP runtime support does not have a mapping. This could and does arise when DPIDs are sent in application messages. To eliminate any need for a centralized or distributed id resolution mechanism, DPIDs are not integers but 28-byte structures containing all the information needed to address the corresponding process and more. The additional information represents "the kitchen sink". Some of it, in retrospect, has turned out to be useful- other components (indicating what protocol- for example IPX- is involved) may never find a use.

There are parallel methods which naturally assign integers to a set of processes and use these assignments in their algorithms. As it turns out, the use of a non-basic type does not pose much of a problem in those cases. Programs which use such methods do not spawn processes dynamically but rather use a fixed number of processes created from the outset. The need to create an initial DPID-to-integer map is only a minor inconvenience to the application writer, and there are libraries, such as stdDP [Arnow, 94] built on top of DP that provide this service, along with others. From an esthetic point of view the chief regret with this choice is the necessity for providing a dpidmatch() function. On the other hand, from an implementation point of view, things are greatly simplified.

Process creation and initialization. Processes are created using the rexec service. In order to spawn a process, the creator, after checking the argument to dpspawn() as best as possible, forks a child process which does most of the work. The child process calls rexec and uses the resulting TCP connection to deliver the semantic packet and the internal hosts table to the newly spawned process. It uses the same connection to receive the new process's DPID (which contains, among other data, its UDP address). This is the only use of TCP in the implementation. The parent receives the DPID from the child through a pipe and waits for the child process to complete the handshaking with the new process and disappear, along with its TCP connection. This arrangement avoids the need for a separate call by the parent to recognize the completion of the process creation. Although it compels the parent to wait until the new process is created, the creator is still able to receive and respond to interrupting messages- an allowance which is made much easier by having a child process do most of the work.



Reliability. The scheme for enhancing reliability is inspired by one used, in a different context, in the early 1980s in the design of a fault-tolerant Unix box based on a shared memory architecture [Borg83]. Each active process is created with a backup process residing on a different workstation. The workstation housing a backup must be binary compatible with that executing the active process. Furthermore, because of the way recovery is implemented, each pair of active and backup processes must share some file address space in common (though between two distinct pairs there is no such need). The scheme only guarantees against single workstation failure, though it may work in the event of multiple failures. What it requires is that the workstation holding the backup process not fail. So in the figure below, had th backup of process C been executing on an additional machine, say Sparc#4, rather than on Sparc#2, then both Sparc#1 and Sparc#2 could have failed simultaneously.





When process A sends a message to process B, process B uses the message and sends a copy of the message to its backup, B'. If the message is a reliable message, B' sends the acknowledgment to A. Thus A continues to retransmit in the usual way until it is certain that both B and B' have the message. Redundant transmits to B cause no problem because they are simply stale messages which an ack/timeout/retransmit protocol would ignore anyway. B' saves all messages (until, as described below, a checkpoint operation takes place). Upon detection of failure (see below), B' starts executing. Since it has all the messages (with the possible exception of a few unreliables) that B received, its execution will be identical. It will send output messages that are redundant to ones sent by B previously, but these will be treated as stale by their recipients and not cause any inconsistency.




To preserve the total order of messages as B received them, B assigns each newly received message an internal sequence number. This number is passed on with every copy of that message that B sends to B'.

Failure is detected in an ad hoc fashion. If process B periodically sends dummy messages to its backup, B'. If B' does not hear from B after a time, it pings B's machine. If there is no respond after a number of tries, B' takes over from B.

Recovery begins with B' sending a recovery message to B's parent and every process that B has interacted with. Processes receiving such messages revise their internal DP process tables appropriately and propagate these messages to all other processes they in turn have communicated with. The application code layer never learns about this and will continue to use the original DPID of B which will now be mapped to that of B' by the DP implementation. Delay in this propagation of recovery messages poses no problem because messages sent to B will remain unacknowledged and hence be retransmitted, eventually, to B'. Furthermore, even if B has only failed temporarily (the transceiver cable fell out, say) and comes back to life in the middle of the recovery there still will not be a problem since it always forwards any messages that it receives to B'. Any messages that a temporarily reincarnated B would send will either be stale or cause the equivalent B' message to be treated as such.

Following the transmission of recovery messages by B', is the roll-forward phase. To avoid the delay that would result from having to roll-forward from scratch, active processes periodically (and transparent to the application) have checkpoints, where DP writes out the process's entire stack and data segment to disk. (This is why the active/backup pair must have some file space in common.) At the outset of the roll-forward phase of recovery, these segments are copied into the address space of process B', so the roll-forward starts from the last checkpoint. These checkpoints are done quite infrequently, on the order of 10-15 minutes. The rationale is that anyone running a computation on a group of workstations in which one fails should be grateful to have only a recover delay of 15 minutes. The reason for the fault-tolerance is for computations that run hours, not minutes.

To reduce excess message traffic resulting from redundant messages sent by B' in its roll-forward phase, the active process keeps B' informed as to the number of messages it has sent out to each process. During roll-forward, these counts are decremented and no messages are actually sent out to a given process until its corresponding count has reached 0.

This scheme necessarily requires a number of restrictions on the activities of the active processes. One severe restriction is that they cannot be doing I/O other than DP message transmission (or if they do I/O, its integrity can't be guaranteed).

(6) Performance

A number of comparisons of DP's performance with that of PVM have been made. One test involves a ring of processes passing a single message from one to the other. Another involves a set of processes, each of which is sending and receiving messages symmetrically to all the others.

In each test, a DP and an equivalent PVM program were run simultaneously, in the same environment (same machines, same network, same directories, etc.) Each program was given a timeout value and the number of messages passed at that point was measured. Table 1 shows the ratio of messages sent in the DP program to that of the PVM.

Table 1. Comparing DP and PVM Processes DP/PVM: ring DP/PVM: set 8 1.33 0.82 16 1.34 0.88 64 1.23 0.81 100 1.36 0.90 112 1.37 1.78


These results suggest DP performs comparably to PVM, that DP may scale better (probably because it does not rely on TCP connections) and that a more through performance study is desirable.

(7) Portability

DP is implemented on SunOS, Solaris, AIX and on DEC RISCstations. Earlier versions were implemented on the Alliant FX-8 and the KSR-1. A Windows-NT implementation is underway as is a port to NetBSD.
Apart from its reliability enhancement, DP is very undemanding of the underlying system. It requires the ability to spawn remote processes, send messages without blocking and have interrupt-driven input. The reliability enhancement described above has been implemented on SunOS only. Presumably it could be carried out in most Unix environments.

(8) Retrospect and Prospects

There seems to be a gap between system designers and application programmers in the area of parallel distributed programming. In this project, I started out wearing an application programmer hat. I had a set of requirements. There was no library then that came close to meeting them and even now no other library meets all of them. At the outset, I had no plan, for example, to provided dynamic process creation. As soon as I wore the system designer hat for a while, that seemed to be a great weakness in the design. It seemed that the flexibility to start with only a few processes and then as new tasks are identified, create additional ones is crucial. Ideally, the programmer would design the process structure of the application to mirror the logical task structure of the problem. After taking the trouble to provide this facility, I was quite chagrined to find that most of the DP users simply assess the number of machines that they have available, choose a number of processes about twice that number, and let them run, using a "worker parallelism" paradigm, in which worker processes are given or pick up tasks as they become idle. The reason for this is understandable. Available hardware is the determining factor in the plans of these practitioners. As I turned to using DP myself, I found I was doing the same thing. Is dynamic process creation really worth the trouble?

On the other hand, the interrupting message facility and the unreliable messages are used extensively. The former in particular has been seen to simplify the parallelization of existing code, by eliminating the need for finding the places in the code to put receives. The interrupting message handler takes care of that. Message sends still need to be inserted into the existing code, but somehow it is easier to identify the points where there is a result to brag about (to other processes say) than to identify receives. In cases where the reverse is true, then sending requests for data can be placed at the appropriate points and the sending of results can be interrupt driven.

On the other hand, the interrupting message facility and the unreliable messages are used extensively. The former in particular has been seen to simplify the parallelization of existing code, by eliminating the need for finding the places in the code to put receives. The interrupting message handler takes care of that. Message sends still need to be inserted into the existing code, but somehow it is easier to identify the points where there is a result to brag about (to other processes say) than to identify receives. In cases where the reverse is true, then requests for data can be placed at the appropriate points (send request in an interrupting message, wait for response) and the actual sending of results can be driven by the arrival of these interrupting messages.

The implementation of single processor fault-tolerance invites an effort to undertake process migration and load balancing. Whether the admittedly heavy-handed fault-tolerant scheme used here is efficient enough for that remains to be seen.

(9) Availability

DP runs on Sun SPARCstations, DEC RISCstations and on IBM RS/6000s with C and Fortran interfaces. It is, along with documentation and some utilities, available from the author.

(10) Acknowledgments

While an undergraduate at Harvard, Haibin Jiu spent two of his summers assisting in this effort. Jim Basney, a student at Oberlin spent a "winter term" on this as well. I especially would like to acknowledge the work of Jerry Chen, who while working on his doctorate at CUNY implemented an early version of DP on the KSR-1 and with whom I have had many valuable conversations.

(11) References

Andrews, G.R.: The distributed programming language SR-- mechanisms, design and implementation. Software- Practice and Experience 12,8 (Aug. 1982).
Arnow, D.M.: Correlated Random Walks in Distributed Monte Carlo Programs. ICIAM 91, Washington D.C. (July 1991).
Arnow, D.M.: StdDP- a layered approach to distributed programming libraries. T.R. 94-11 Dept. of CIS, Brooklyn College (1994).
Arnow, D.M., McAloon, K.M. and Tretkoff, C.: Distributed programming and disjunctive programming. Proceedings of the Sixth IASTED-ISMM Int. Conf. on Parallel And Distributed Computing And Systems Washington D.C. (October 1994).
Arnow, D.M, McAloon, K.M., and Tretkoff, C.: Parallel integer goal programing. To appear in the 23rd ACM Computer Science Conference, Nashville. (1994).
Aurebach, J., Kennedy, M., Russell, J., and Yemeni, S.: Interprocess communication in Concert/C. T.R. RC 17341, IBM Watson Research Center, Yorktown Heights, (1992).
Bal, H. E., Steiner, J. G., and Tanenbaum, A.S: Programming languages for distributed computing systems. Computing Surveys 21,3 (Sept. 1989).
Borg, A., Baumbach, J., and Glazer S.: A message system supporting fault tolerance. 9th ACM Symp. on Operating Systems Principles. Bretton Woods, New Hampshire, (Oct. 1983).
Butler, R., and Lusk, E.: User's guide to the P4 programming system. Tech. Rep. ANL-92/17, Argonne Nat. Lab. (1992).
Chen, J.: Distributed Green's function Monte Carlo calculations. Ph.D Thesis, Dept. of CS, CUNY (1994).
Clark, K.L.: PARLOG and its applications. IEEE Transactions on Software Engineering SE-14, 12 (Dec. 1988).
Douglas, Craig C., Mattson, Timothy G., and Schultz, Martin H.: Parallel programming systems for workstation clusters. Yale University Dept of CS Technical Report, (Aug., 1993).
Geist, G.A. and Sunderam, V.S.: PVM- Network-based concurrent computing on the PVM system. Concurrency: Practice and Experience 4(4) (Jun., 1992).
Gelernter, D.: Generative communication in Linda. ACM Transactions on Programming Languages and Systems 7, 1 (Jan. 1985).
Goldberg, Arthur P.: Concert/C Tutorial: An Introduction to a Language for Distributed C Programming, IBM Watson Research Center, Yorktown Heights, (Mar., 1993).
Hoare, C.A.R.: Communicating sequential processes. Communications of the ACM 21,8 (Aug. 1978).
Marsland, T.A., Breitkreutz, T., and Sutphen, S.: A network multi-processor for experiments in parallelism. Concurrency: Practice and Experience, 3(1), (1991).
MPI Forum: Message Passing Interface Standard (Draft). Oak Ridge National Laboratory. (Nov. 1993).
Parsons, I.: Evaluation of distributed communication systems. Proceedings of CASCON '93, Vol 2. Toronto, Ontario, Canada (Oct. 1993)
Strom, R.E. and Yemeni, S.: NIL: An integrated language and system for distributed programming. SIGPLAN Notes 21, 10 (Oct. 1983).
Sunderam, V.S.: PVM- A framework for parallel distributed computing. Concurrency: Practice and Experience 2 (1990).

Appendix: The DP interface header:

/* SEND FLAGS													*/
#define		DPRECV			0x00	/* not interrupting	 	*/
#define		DPGETMSG		0x01	/* interrupting	       */
#define		DPREL			0x00	/* guaranteed delivery	*/
#define		DPUNREL		0x02	/* no guarantee			*/

/* RECV MODES													*/
#define		DPBLOCK		0x00	/* Wait for message		*/
#define		DPNOBLOCK		0x01	/* Don't wait				*/

#define		DPSUCCESS		0		/* RETURN CODES			*/
#define		DPFAIL		(-1)
#define		DPNOMESSAGE	(-2)

#define		DPIDSIZE		28
typedef struct {
			char	dpid_info[DPIDSIZE];
	} DPID;

typedef	 void		(*FUNCPTR)();
#define NULLFUNC			((FUNCPTR) 0)

int dpinit(char *prog, char *semanticp, int *size, *hostid);
	/* RETURNS: DPFAIL or # of available hosts */

int dpaddhost(char *hstn, *dmnn, *path, *user, *passwd);
	/* RETURNS:  number of hosts in host table */

void dpgethost(int hid, DPHOST *hptr);
	/* STORES BACK: host info for host #hid 				*/

int dpwrite(DPID *dest, char *data, int nbytes, mode);
/* RETURNS: DPSUCESS or DPFAIL or DPDESTDEAD			*/

int dprecv(DPID *src, char *data, int limit, int flags);
/* RETURNS: DPSUCCESS or DPFAIL or DPNOMESSAGE	 		*/

int dpgetmsg(DPID *src, char *data, int limit);
/* RETURNS: DPSUCCESS or DPFAIL or DPNOMESSAGE 		*/

int dpspawn(char *prog, DPID *newid, int hid, char *semantic, int size, int sendflag);
/* STORES BACK the id of 	the new process and
		returns DPSUCCESS or DPFAIL							*/

FUNCPTR dpcatchmsg(FUNCPTR f);
/* RETURNS NULLFUNC or ptr to previous catch function		*/

void dpexit(char *exitstrng);	
	/* removes process from DP group and exits 				*/
void dpgetpid(DPID *myid);
	/* STORES BACK: DPID of executing process 				*/
void dpalarm(long t, FUNCPTR f);
	/* set alarm for user's function f							*/
void dplongjmp(jmpbuf label, int rv);
	/* longjmp to label		, return value rv					*/
void dppause(long t, FUNCPTR f);
	/* set alarm for user's function f and pause 			*/
int dpblock();			/* disable interrupts				*/
void dpunblock();		/* enable interrupts					*/
void dpsetexfun(FUNCPTR f)
	/* set a f to be called when exiting						*/
void dpstop(char *stopmsg);
	/* abandon ship fast											*/
int dpidmatch(DPID *ip1, DPID *ip2);
	/* return true if match; else false						*/



Back to David Arnow's DP Page.

tc