Concurrent Code Using Groovy

 
Summary

Presented are a few ways of coding a concurrent program using Java threading and also using a Communicating Sequential Processes (CSP) library.  They are written in the Groovy language. Used as an example is a simple lottery number generation application.

[intro],  [problem],  [Listings],  [simple thread],  [object lock],  
[executor],  [Fork/Join],  [jCSP],  [Actor],  [Declaritive CSP using Spring],  [Links],   [bottom of page]

Intro

In a prior blog post (Java Plain Old Concurrent Object) I wrote about a possible need for a higher level concurrency support in Java.   Here I give the program code that I used in September 2007 to learn more then the rudimentary Java concurrency concepts.

I managed to use CSP and the usual Java concurrent library support, and recently was about to try Actors using the new GPars library.  It was a good way to get a feel for each approach.  Note that it was not enough to become a concurrency expert; that was not a goal; still studying Goetz’s book Java Concurrency In Practice.

My first use of multitasking code was in C++ code I did in 1992:

… technical hurdles with the product, so I put it aside until I could get back to it. One thing that I was proud of was learning a little bit of Fuzzy Logic and using it as the controller. I even wrote a graphical simulator in the C++ language; threading was fun stuff. Watching the fuzzy sets behave like analog surfaces or neural EEG waves gave me the idea for the biomimicry aspects. — An Adaptive Controller Using Fuzzy Memory

Problem

For code example I used lottery number play generator.  Powerball is a U.S. multi-state lottery game.  A Powerball play is composed of 5 white balls in range 1 – 55 and one red ball in the range of 1 – 42.  As you can imagine the odds are extremely high that you could guess the Jackpot set (1 in 195,249,054).  The jackpot gets very large and that is a great enticement to forgo any sanity and “you can’t win if you don’t play” you know. 

If you enjoy the frills, ambiance, and clientele, gambling is a great way to throw away money.   For more on Lottery see: Lottery Links
back to top

Source Code

To simulate real random number generation, the program will continuously generate plays, but only produce the requested numbers of generated sets when the user clicks the enter key.  I’m assuming a user waiting a certain amount of time adds a random element to the PRNG being used.  Does it?  I don’t think so, but that’s another topic.

A few points:

  • The problem could have been solved without using concurrent approach.
  • The code could be optimized and made more Groovy.
  • I used the Eclipse IDE and the Groovy Eclipse Plugin.
  • Example code use at your own risk.  No correctness or quality guaranteed.

A Thread monitor view using VisualVM doesn’t show much, just six threads with four daemon ones.

We use the following batch file, which simply invokes the Main.groovy script.

@rem File: run.cmd
@rem Created 28 Dec 2008 J. Betancourt
call setEnv.cmd
groovy ..\src\main\groovy\net\cox\members\jbetancourt1\Main.groovy %*

The main script driver is shown in listing 1.  Two arguments are specified, how many dollars to lose and what type of concurrency approach to use.  Executing it with no arguments would give:

c:\Users\jbetancourt\Documents\projects\dev\ConcurrentGroovy-1\bin>run
usage: usage -t type -d dollars [-h]
 -d,--dollars    How many dollars to lose
 -h,--help       Usage information
 -t,--type       Type of threading

Options for '-t' are:
 "simple"    ----> "Using simple Threads"
 "lock"      ----> "Using lock"
 "executor"  ----> "Using Futures"
 "csp"       ----> "Using JCSP"
 "actor"     ----> "Using Actors"
Example: run -d 5 -t simple

And, here is a sample run:

c:\Users\jbetancourt\Documents\projects\dev\ConcurrentGroovy-1\bin>run -d 5 -t simple
Setting up games 5 using "simple"
The odds of winning jackpot are: 1 in 146,107,962.00
Enter return key

Number of games generated are: 5
 1:   7 11 14 23 34 PB:  9
 2:  21 22 27 28 55 PB: 21
 3:   2  9 30 37 46 PB: 30
 4:   7 49 50 51 53 PB: 30
 5:   9 18 24 40 42 PB: 32
Good luck!

Success?

Not really.   A funny thing about this code, if you invoke it and quickly hit the return key, it doesn’t work correctly.  Concurrent code can be hard to craft.
Example:

>run -d 5 -t executor
Setting up games 5 using "executor"
Press the Enter key on keyboard

Number of games generated are: 1
 1:  18 20 22 27 41 PB: 25
Good luck!

Listing 1 Main driver class:

package net.cox.members.jbetancourt1;

import org.apache.commons.collections.buffer.CircularFifoBuffer
import org.apache.commons.collections.Buffer
import java.security.SecureRandom
import org.apache.commons.collections.BufferUtils

/**
 * Generate Powerball plays.
 * Required libraries:
 * - Commons CLI
 * - Commons collection
 *
 * A Powerball play is composed of 5 white balls in range 1 - 55
 * and one red ball in the range of 1 - 42
 * This script gets how many games to play, then
 * continually generates this number of games
 * and adds them into a circular buffer.  This is
 * repeated until the user hits a key, which
 * interrupts the generation thread and dumps the results.
 *
 * A circular buffer is used since the results are
 * only needed when the user hits a key,
 * thereby making it "random".  In actuality,
 * since we are using a psuedo-random
 * generator, this is still not true random.
 * There is no way to improve the odds of
 * picking the correct numbers for a lottery game.
 */
 class Main{
 def apps = [
    "simple": [new GuessSimple(),"Using simple Threads"],
    "lock":   [new GuessWithLock(),"Using lock"],
    "executor": [new GuessWithExecutor(),"Using Futures"],
    "csp": [new GuessCSP(),"Using JCSP"],
    "actor" : [new GuessActor(),"Using Actors"]
 ]

 /** Main entry point */
 static main(args){
    def main = new Main()

    def options = main.parseArguments(args)
    if((options != null ) && options.d && options.t){
        println "Setting up game" + (options.d>0? "s": "") +
        " ${options.d} using \"${options.t}\""
        def dollars = Integer.parseInt(options.d)
        def games = BufferUtils.synchronizedBuffer(
           new CircularFifoBuffer(dollars))

           def threadType = ""
           if(options.t){
               threadType = options.t
           }

           def guess = main.apps[threadType][0]
                guess.generate(dollars,games)
    }

    System.exit(0)
 } // end main

 /****************************************************************************
 * Using CLI builder, parse the command line.
 */
 def OptionAccessor parseArguments(String[] args){
    def cli = new CliBuilder(usage: 'usage -t type -d dollars [-h]')
    cli.h(longOpt: 'help', 'Usage information')
    cli.t(longOpt: 'type', 'Type of threading',args:1)
    cli.d(longOpt: 'dollars','How many dollars to lose',args:1,required:false)
    def options = cli.parse(args)

    if(options == null || options.getOptions().size() == 0 || options.h){
         cli.usage()
         println "\nOptions for '-t' are: "
                    apps.each{
         println("\t\"${it.key}\"".
              padRight(12) + " ----> \"${it.value[1]}\"")
    }
    println "\nExample: run -d 5 -t simple"
    return options
  }

  return options
 }
} // end class Main

Listing 2 Base class:

<pre>package net.cox.members.jbetancourt1;

import org.apache.commons.collections.buffer.CircularFifoBuffer
import org.apache.commons.collections.Buffer
import java.security.SecureRandom
import org.apache.commons.collections.BufferUtils

/**
 *
 */
class GuessBase{
 def whiteBalls = [0]
 def redBalls = [0]
 def random = new SecureRandom()
 //each element is an array of 0-4 white ball values and 5 is the red ball value.
 def games //= BufferUtils.synchronizedBuffer(new CircularFifoBuffer(dollars))

 def createBallSets(){
      def i = 0
      for(j in 1..55){
        whiteBalls[i++] = j
      }
      i = 0
      for(j in 1..42){
         redBalls[i++] = j
      }
 }

 /** Pick the set of white balls */
 def pickWhite(){
     def array = []
     array.addAll(whiteBalls)
     Collections.shuffle(array,random)
     return array[0..4]
 }

 /** swap two elements in a list */
 def swapElements(list,i,j){
      def temp = list[i]
      list[i] = list[j]
      list[j] = temp
 }

 /**  Pick the single red 'powerball'. */
 def pickRed(){
      int offset = random.nextInt(redBalls.size())
      return redBalls[offset]
 }

 def showGames(){
     def gamesSorted = games.sort(){a,b -> a[5] <=> b[5]}
     def rownum = 1
     println "Number of games generated are: ${games.size()}"
     for (loop in gamesSorted){
       print ((rownum++ + ": ").padLeft(8))
       for(x in (  loop[0..4]).sort()  ){
         print ((x.toString()).padLeft(3))
       }
       println " PB:" + ((loop[5]).toString()).padLeft(3)
     }
     println "Good luck!"
 }

} // end class GuessBase

Listing 3 Using simple Java Thread class:

package net.cox.members.jbetancourt1;

/**
 * Example that uses a simple 'inline' thread.
 */
class GuessSimple extends GuessBase{

 /** create d guesses and store into games array */
 def synchronized generate(d,games){
   this.games = games
   createBallSets()

   def worker = new Thread();
   worker.setDaemon(true);

   worker.start{ // Groovy allows closure here.
     while(!Thread.currentThread().isInterrupted()){
       def w = pickWhite()
       def r = pickRed()
       games.add((Object)(w+r))
       random.setSeed(System.currentTimeMillis());
     }
   }

  println "The odds of winning jackpot are: 1 in 146,107,962.00\nEnter return key"
  new InputStreamReader(System.in).readLine()

  worker.interrupt()
  Thread.sleep(1000) // required!
  showGames()
 }
} // End of GuessSimple.groovy

Listing 4 Using Object lock:


package net.cox.members.jbetancourt1;

/**
 *
 */
class GuessWithLock extends GuessBase{
  def lock = new Object() // for thread synchronization
  /** create d guesses and store into games array */
  def generate(d,games){
  this.games = games
  createBallSets()

  def worker = new Thread(){
     void run(){
       while(!Thread.currentThread().isInterrupted()){
         def w = pickWhite()
         def r = pickRed()
         games.add((Object)(w+r))
         random.setSeed(System.currentTimeMillis());
       }
       synchronized(lock){
          lock.notify()
       }
     }
 }

 worker.start()

 println "The odds of winning jackpot are: 1 in 146,107,962.00"
 println "Enter any key<cr>"
 new InputStreamReader(System.in).readLine()
 worker.interrupt()
 showGames()
 }
}

Listing 5 Using Executor:

/**
* File: GuessWithExecutor.groovy
* Author: Josef Betancourt
* Date:  9/1/2007
*
*/
package net.cox.members.jbetancourt1;

import org.apache.commons.collections.*
import java.util.concurrent.*

/**
 * Generate powerball plays using Executor.
 */
 class GuessWithExecutor extends GuessBase {

 /** create d guesses and store into games array */
 def generate(d,games){
       def dollars = d
       try{
         def exec = Executors.newSingleThreadExecutor()
         def gen = new GameGenerator<Buffer>(dollars,games)
         gen.createBallSets()
         def future = exec.submit(gen)
         println "Press the Enter key on keyboard"
         new InputStreamReader(System.in).readLine()
         exec.shutdownNow()
         waitForTermination(exec,100,10) // 100ms, max of 10 tries

         try{
           def buffer = future.get()
           def exec2 = Executors.newSingleThreadExecutor()
           exec2.execute(new GenerateReport(buffer))
           exec2.shutdown()
         }catch(ExecutionException ignore){
            ignore.printStackTrace()
         }
       }catch(Exception ex){
           ex.printStackTrace()
       }
 }

 /**
 *
 * @param exec the Executor
 * @param time how long to wait
 * @param unit unit of time
 * @param tries how many times to wait
 */
 def waitForTermination(exec,time,tries) throws Exception {
    def count = 0
    while(!exec.awaitTermination(time, TimeUnit.MILLISECONDS)){
        if(count++ >= tries){
           break
        }
    }
 }

 } // end class GuessWithExecutor

 /**
 * A Callable that generates games until interrupted.
 */
 def class GameGenerator extends GuessBase implements Callable {
       def GameGenerator(dollars,games){
                this.games = games
       }

      /**
       * Create d guesses and store into games array.
       * @see java.util.concurrent.Callable#call()
       */
     public Object call() throws Exception {
         while(!Thread.currentThread().isInterrupted()){
             def w = pickWhite()
            def r = pickRed()
            games.add((Object)(w+r))
            random.setSeed(System.currentTimeMillis());
         }
         return games
 }

 }  // end class GameGenerator

 /**
 * Class to render the results
 */
 def class GenerateReport extends GuessBase implements Runnable {
      def GenerateReport(Buffer games){
              this.games = games
      }

      public void run(){
         showGames()
     }
} // end class GenerateReport

Listing 6 Using Fork/Join:


// to do:  approach will be to fork each required game set into its own thread, then join them to get the final result.  Crazy, but a way to learn about Fork/Join and have sample code.

CSP

Some experts are touting the benefits of CSP. Below I use the jCSP library to create a network of processes that collaborate to solve the same problem. Note that some of the people involved in jCSP are adding some CSP support to GPar.

Listing 7 Using jCSP:

/**
* File: CspGuess.groovy
* Author: Josef Betancourt
* Date:  9/1/2007
*
*
*/

package net.cox.members.jbetancourt1;

import jcsp.lang.*;

/**
 * Generate powerball plays using CSP.
 *
 * This version of the example uses the JCSP library.  Note
 * that unlike the other examples, there is an explicit
 * declaration of a 'network'.
 *
 * Required libraries:
 * - JCSP ver. 1.0-rc8 CSP library in Java:
 * <a href="http://www.cs.ukc.ac.uk/projects/ofa/jcsp"></a><br/>
 *
 * Uses PAR and ALT methods presented in:
 * <a href="http://www.soc.napier.ac.uk/publication/op/getpublication/publicationid/9097759">
 * "Groovy Parallel!  A Return to the Spirit of occam?"</a>
 * by Jon KERRIDGE, Ken BARCLAY, and John SAVAGE
 * The School of Computing, Napier University, Edinburgh EH10 5DT in
 * Communicating Process Architectures 2005 13
 * Jan Broenink, Herman Roebbers, Johan Sunter, Peter Welch, and David Wood (Eds.)
 * IOS Press, 2005
 *
 * Perhaps, a better approach is possible using actors?  See for example,
 * <a href="http://gpars.codehaus.org/">Groovy Parallel Systems</a>
 *
 */
class GuessCSP extends GuessBase {
 def generate(dollars,games){
       def suspendChannel = new One2OneChannel()
       def resultChannel = new One2OneChannel()

       // Create the CSP network and run it.
       new PAR(  [
            new GameProcess(suspendChannel,resultChannel,games,dollars),
            new UserInputProcess(suspendChannel),
            new ReportProcess(resultChannel)]
       ).run()
 }

}

/** process that generates games */
def class GameProcess extends GuessBase implements CSProcess{
    def out
    def suspend
    def dollars

    /**  Create the process instance */
    def GameProcess(done,out,games,dollars){
       this.dollars = dollars
       this.suspend = done
       this.out = out
       this.games = games
       createBallSets()
    }

    /** run the process network */
    public void run(){
       def alternative = new ALT([suspend, new Skipper()])
       def STOPPING=0, RUNNING=1
       println "Creating  ${dollars} game${(dollars>0? "s": "")}  "
       def suspended = false
       while (!suspended){
          switch (alternative.priSelect ())
             {
               case STOPPING:
                 suspend.read();
                 if(games.size()>0){
                      suspended = true;
                      out.write(games)
                 }
                 break;
               case RUNNING:
                 def w = pickWhite()
                 def r = pickRed()
                 games.add((Object)(w+r))
                 random.setSeed(System.currentTimeMillis());
                 break
             }
      }
 } // end run()

} // end GameProcess process

/** Process that gets the user input */
def class UserInputProcess implements CSProcess{
     def suspend

     def UserInputProcess(done){
            this.suspend = done
     }

     public void run(){
              println "Enter any key<cr>"
              new InputStreamReader(System.in).readLine()
              suspend.write(new Object())
     }
} // end UserInputProcess process

/**
 * Process to render the results to console
 */
def class ReportProcess extends GuessBase implements CSProcess{
 def input

 def ReportProcess(input){
     this.input = input
 }

 public void run(){
     games = input.read() // get result via channel only
     showGames()
 }
} // end of ReportProcess process

/**
 * PAR A Groovyish Parallel.
 */
def private class PAR extends Parallel {
       PAR(processList){
            super( processList.toArray(new CSProcess[0]))
       }
}

/**
 * ALT A Groovyish Alternative.
 */
def private class ALT extends Alternative {
       ALT (guardList) {
              super( guardList.toArray(new Guard[0]) )
       }
}
// end of CspGuess.groovy

Actor

I started to look into the Actor approach. Below is the start of code to solve the same problem. Even with the few lines of code, attempts to run it give an an exception, and classpath and other easy fixes do not solve it. On my todo list.

Setting up games 5 using "actor"
start
Caught: groovy.lang.MissingMethodException: No signature of method: static groovyx.gpars.actor.Actor.actor() is applicable for argument types: (net.cox.members.jbetancourt1.GuessActor$_generate_closure1) values: [net.cox.members.jbetancourt1.GuessActor$_generate_closure1@1fe571f]
Possible solutions: stop(), wait(), start(), any(), call(java.lang.Object), wait(long)
	at net.cox.members.jbetancourt1.GuessActor.generate(GuessActor.groovy:55)
	at net.cox.members.jbetancourt1.Main.main(Main.groovy:63)

Listing 8 Using Actors:

package net.cox.members.jbetancourt1
//import static groovyx.gpars.actor.Actors.actor
import groovyx.gpars.actor.*
import groovyx.gpars.actor.Actor.*
// @Grab(group='org.codehaus.gpars', module='gpars', version='0.9')
// @GrabResolver(name='jboss', root='http://repository.jboss.org/maven2/')

/**
 */
class GuessActor extends GuessBase  {
	def main = new GuessActor()
	        def dollars
	        def games
	        main.generate(dollars, games)
        }

	def generate(dollars,games){
		println "start"

		def gen = Actor.actor { index ->
			loop {
				react {message ->
					if (message instanceof String) reply "got it"
					else stop()
				}
			}
		}

} // end of class GuessActor


Declarative CSP Using Spring

While experimenting I had an idea that ordinary Java beans could be used in a CSP network with suitable object wrappers or weaving.  That is, an ordinary POJO could be wrapped to appear as a Process, just hook up the bean’s entry point for service into the CSP channel end points.  This would be similar to the way that ordinary beans can be annotated to behave as Active Objects in the GPars Groovy library.

Below is the Spring Framework configuration file that does this for this software example.  The code for the wrapper objects are not included here, too kludgey.

Off topic: The XML language config of Spring is looking long in tooth. Good thing Spring now supports annotations and Java config. Time for a Groovy builder approach too?.

Update:

Listing 9 bean configuration file:

<?xml version="1.0" encoding="UTF-8"?>
<!--
 File: applicationContext.xml
 Spring Framework based bean definitions for JCSP example program.
 Author: Josef Betancourt
 Date:  9/19/2007
-->
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xmlns:lang="http://www.springframework.org/schema/lang"  xmlns:util="http://www.springframework.org/schema/util"
 xsi:schemaLocation="http://www.springframework.org/schema/beans

http://www.springframework.org/schema/beans/spring-beans-2.0.xsd


http://www.springframework.org/schema/lang


http://www.springframework.org/schema/lang/spring-lang-2.0.xsd


http://www.springframework.org/schema/util


http://www.springframework.org/schema/util/spring-util-2.0.xsd">

 <!-- ========================================================= -->
 <!-- CSP NETWORK -->
 <!-- ========================================================= -->
 <bean name="network" dependency-check="objects" class="jcsp.lang.Parallel">
     <constructor-arg>
         <list>
              <ref bean="gameProcess" />
              <ref bean="reportProcess" />
              <ref bean="userInputProcess" />
              <ref bean="progressProcess"/>
         </list>
     </constructor-arg>
 </bean>

 <!-- ========================================================= -->
 <!-- CHANNELS -->
 <!-- ========================================================= -->
 <bean name="suspendChannel" class="jcsp.lang.One2OneChannel"/>
 <bean name="outputChannel" class="jcsp.lang.One2OneChannel"/>
 <bean name="progressChannel" class="jcsp.lang.One2OneChannel"/>
 <bean name="skipperChannel" class="jcsp.lang.Skip"/>

 <!-- ========================================================= -->
 <!-- GUARDS  -->
 <!-- ========================================================= -->
 <bean name="alternative" class="jcsp.lang.Alternative" dependency-check="objects">
        <constructor-arg>
            <list>
               <ref bean="suspendChannel" />
               <ref bean="skipperChannel" />
            </list>
        </constructor-arg>
 </bean>

 <!-- ========================================================= -->
 <!-- POJOs -->
 <!-- ========================================================= -->
 <!--  Creates the game generator, but it is setup programmatically
 since it needs the user supplied number of games to play, see GameGenerator.setup(int).
 -->
 <bean name="gameGenerator" class="net.cox.members.jbetancourt.pb.game.GameGenerator">
      <property name="random">
         <bean class="java.security.SecureRandom"/></property>
         <property name="maxRed" value="42" />
         <property name="maxWhite" value="55" />
 </bean>

 <!-- ========================================================= -->
 <!-- PROCESSES -->
 <!-- ========================================================= -->
 <!-- Receives signal from user to accept current generated game sets.
 -->
 <bean name="userInputProcess" init-method="init" class="net.cox.members.jbetancourt.pb.process.JCspSingleShotProxy">
    <property name="outputChannel" ref="suspendChannel" />
    <property name="methodName" value="run"/>
    <property name="target">
            <bean class="net.cox.members.jbetancourt.pb.game.KeyListener" scope="prototype"/>
    </property>
 </bean>

 <!--  Game generator.  Communicates with input and report processes.
 -->
 <bean name="gameProcess" dependency-check="objects"
            class="net.cox.members.jbetancourt1.pb.process.GameProcess">
     <property name="alternative" ref="alternative" />
     <property name="suspendChannelIn" ref="suspendChannel" />
     <property name="reportChannelOut" ref="outputChannel" />
     <property name="progressChannelOut" ref="progressChannel"/>
     <property name="gameGenerator" ref="gameGenerator"/>
     <property name="showInterim" value="false"></property>
 </bean>

 <!-- Output results to console.
 -->
 <bean name="reportProcess" dependency-check="objects"
         class="net.cox.members.jbetancourt1.pb.process.ReportProcess" scope="prototype">
       <property name="input" ref="outputChannel" />
 </bean>

 <!-- Output interim results to console
 -->
 <bean name="progressProcess" dependency-check="objects"
    class="net.cox.members.jbetancourt1.pb.process.ReportProcess" scope="prototype">
      <property name="input" ref="progressChannel" />
 </bean>

</beans>

Listing 10 Java main to load the network:

package net.cox.members.jbetancourt1.pb;

import jcsp.lang.Parallel;
import net.cox.members.jbetancourt1.pb.game.GameGenerator;

import org.apache.commons.cli.*;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * 
 * Main program.
 * 
 * @author JBetancourt
 * 
 */
public class Main {
    private static final String TITLE = 
    "====== Lottery Generator using JCSP and Spring ============";
    private static Parallel network;
    private static ApplicationContext context;

    /**
     * Entry point for running the Guess application. Parses 
     * command line for number of games to play, loads 
     * wired application using Spring Framework
     * based IoC, sets up game array, and then starts 
     * the network of processes. 
     * @param args
     */
    public static void main(String[] args) {
        System.out.println(TITLE);
        int dollars = parseCommandLine(args);
        if ( dollars == 0 ) {
            return;
        }

        context = new ClassPathXmlApplicationContext(
            new String[] { "applicationContext.xml"});
        
        GameGenerator gen = (GameGenerator) context.
        getBean("gameGenerator");
        gen.setup(dollars);

        network = (Parallel) context.getBean("network");
        network.run();
    }

    /**
     */
    private static int parseCommandLine(String[] args) {
        ... ellided ...
    }

}

Further Reading

Lottery Links

One Response to Concurrent Code Using Groovy

  1. [...] Concurrent Code Using Groovy blog post [...]

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

Follow

Get every new post delivered to your Inbox.

%d bloggers like this: