RServe Java multiple threads (Unix)

Concept

On a unix environment it is possible for a threaded java application to call a single RServe instance. For each new connection RServe forks a new process. Each new connection has its own working directory. The working directory is retained if it is non empty, even when the connection is closed.

Example Program

The first step is starting the RServe instance.

R cmd Rserve --RS-port 1000

Here’s the java threaded java code that uses the Rserve instance to service four java threads. The program solves a linear model.

package com.studytrails.rserve;

public class RServeMultiThreadClient {
	public static void main(String[] args) {
		RServeMultiThread thread1 = new RServeMultiThread(1000);
		RServeMultiThread thread2 = new RServeMultiThread(1000);
		RServeMultiThread thread3 = new RServeMultiThread(1000);
		RServeMultiThread thread4 = new RServeMultiThread(1000);

		thread1.start();
		thread2.start();
		thread3.start();
		thread4.start();

		try {
			thread1.join();
			thread2.join();
			thread3.join();
			thread4.join();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

package com.studytrails.rserve;

import org.rosuda.REngine.REXP;
import org.rosuda.REngine.REXPMismatchException;
import org.rosuda.REngine.Rserve.RConnection;
import org.rosuda.REngine.Rserve.RserveException;

public class RServeMultiThread extends Thread {

	private int port = 0;

	public RServeMultiThread(int port) {
		this.port = port;
	}

	public void run() {
		try {
			RConnection c = new RConnection("localhost", port);
			c.eval("N = " + port);
			c.eval("x1=rnorm(N)");
			c.eval("x2 = 1 + x1 + rnorm(N)");
			c.eval("y <- 1 + x1 + x2");
			c.eval("df <- data.frame(y,x1,x2)");
			c.eval("fit <- lm(y ~ x1 + x2, data = df)");
			REXP x1 = c.eval("fit[[1]][2]");
			System.out.println("Thread with port " + port + " result: "
					+ x1.asDouble());
			Thread.sleep(5000);
		} catch (RserveException e1) {
			e1.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (REXPMismatchException e) {
			e.printStackTrace();
		}
	}
}

Leave a Comment