DBZoo - Talent without discipline is like an Octopus on rollerskates.

Scala distributed Mandelbrot

My first foray into distributed programming with scala. This worked out pretty well. The whole system automatically load balances and recovers elegantly if a worker is killed half way through a render or indeed if another joins whilst a render is in progress.

It has no bells and whistles with regards to zooming and panning so as keep the code as simple as possible.

Running a worker is as easy as. The IP and port on the worker are the IP address of the GUI, and the LOCAL port which the worker will used for listening. If you want to many worker on the same server give each a unique port number

# export CLASSPATH=.:/usr/local/scala/lib/scala-library.jar
# cd MandelbrotDistributed/build/classes
# java mandelbrotdistributed/MandelWorker 192.168.1.137 4000
"another worker on the same server"
# java mandelbrotdistributed/MandelWorker 192.168.1.137 4001

This is the entire code:

/*
 * Raster.scala
 */
package mandelbrotdistributed
 
@serializable
class Raster(xlen : Int) {
    var line = new Array[Int](xlen)
    def width = line.length
}
/*
 * Complex.scala
 */
package mandelbrotdistributed
 
class Complex (
    val a: Double,
      val b: Double)
{
    def abs() = Math.sqrt(a*a + b*b)
    // (a,b)(c,d) = (ac - bd, bc + ad)
    def *(that: Complex) = new Complex(a*that.a-b*that.b, b*that.a+a*that.b)
    def +(that: Complex) = new Complex(a+that.a, b+that.b)
    override def toString = a + " + " + b+"i"
}
/*
 * MandelWorker.scala
 */
package mandelbrotdistributed
 
import scala.actors.Actor._
import scala.actors.remote._
import scala.actors.remote.RemoteActor.select
import java.lang.Math
import java.net.InetAddress
 
case class Register(me:Locator)
case class RenderedLine(m:Locator,row:Int,raster:Raster)
case class RenderAction(row:Int,width:Int,height:Int,level:Int)
case class Tick
 
class MandelActor(me : Locator, clientLoc : Locator) {
    RemoteActor.classLoader = getClass().getClassLoader()
 
    actor {
        println("Worker Ready")
        RemoteActor.alive(me.node.port)
        RemoteActor.register(me.name, self)
        loop {
            react {
                case RenderAction(row : Int, width : Int, height : Int, level : Int) =>
                    println("Raster row "+row)
                    sender ! RenderedLine(me,row, generate(width, height, row, level))
                case msg =>
                    println("Unhandled message: " + msg)
            }
        }
    }
 
    // Register with the GUI every 5 secs (heartbeat)
    val client = select(clientLoc.node,clientLoc.name)
    ActorPing.scheduleAtFixedRate(client, Register(me), 0L, 5000L)
 
    def iterate(z:Complex, c:Complex, level:Int, i:Int): (Complex,Int) =
        if(z.abs > 2 || i > level) (z,i) else iterate(z*z+c, c, level, i+1)
 
    def generate(width : Int, height : Int, row : Int, level: Int) : Raster = {
        val raster = new Raster(width)
        val y = -1.5 + row*3.0/height
        for { x0 <- 0 until width}
        {
            val x = -2.0 + x0*3.0/width
            val (z, i) = iterate(new Complex(0,0), new Complex(x,y), level, 0)
            raster.line(x0) = if (z.abs < 2) 0 else i
        }
        raster
    }
}
 
object MandelWorker {
    def main(args: Array[String]) :Unit = {
        // arg0: remote IP of where the MandelGUI program is running
        // arg1: a local port for the mandel worker
        val host = if (args.length >= 1) args(0) else "127.0.0.1"
        val port = if (args.length >= 2) args(1).toInt else 9010
        val gui = Locator(Node(host,9999), 'MandelGUI)
        val me = new Locator(Node(InetAddress.getLocalHost.getHostAddress, port), 'MandelWorker)
        new MandelActor(me, gui)
    }
}
/*
 * MandelGui.scala
 */
 
package mandelbrotdistributed
 
import scala.swing._
import scala.swing.event.ButtonClicked
import scala.actors.{Actor,OutputChannel}
import scala.actors.Actor._
import scala.actors.remote.{RemoteActor,Locator,Node}
import scala.actors.remote.RemoteActor._
import scala.collection.mutable.Stack
import java.awt.image.BufferedImage
import java.awt.{Graphics,Graphics2D}
import java.awt.Color
 
object MandelGui extends SimpleGUIApplication {
    val img = new BufferedImage(480,480,BufferedImage.TYPE_INT_RGB)
    val mandel = Mandel
 
    val drawing = new Panel {
        background = Color.black
        preferredSize = (img.getWidth, img.getHeight)
 
        override def paintComponent(g : Graphics) : Unit = {
            g.asInstanceOf[Graphics2D].drawImage(img,null,0,0)
        }
    }
 
    def clearDrawing() {
        var g = img.getGraphics
        g.setColor(Color.BLACK)
        g.fillRect(0,0,img.getWidth,img.getHeight)
    }
 
    def top = new MainFrame {
        title="Mandelbrot"
        contents = new BorderPanel {
 
            val control = new BoxPanel(Orientation.Horizontal) {
                val start = new Button {
                    text = "Start"
                }
                val stop = new Button {
                    text = "Stop"
                }
                val continue = new Button {
                    text = "Continue"
                }
                contents.append(start,stop,continue)
                listenTo(start,stop,continue)
                reactions += {
                    case ButtonClicked(`start`) =>
                        clearDrawing
                        Mandel.startup
                    case ButtonClicked(`stop`) =>
                        Mandel.shutdown
                    case ButtonClicked(`continue`) =>
                        Mandel.process
                }
            }            
            drawing
 
            import BorderPanel.Position._
            layout(control) = North
            layout(drawing) = Center                                   
        }
    }
 
    object WorkerMgmt {
        private var allWorkers : List[Worker] = List()
        val defaultTTL = 6  //sweeps a worker can survive without a register
 
        def foreach(op: Worker => Unit) = allWorkers.foreach(op)
 
        def findWorkerForRow(row : int) : Worker = {
            allWorkers.filter(w=> w.row == row)(0)
        }
 
        def find(m:Locator) : Worker = {
            if (allWorkers.isEmpty) null
            else {
                val list = allWorkers.filter(w=> w.loc == m)
                if(list.isEmpty) null else list(0)
            }
        }
 
        def register(m:Locator) : Worker = {
            var worker = find(m)
            if (worker == null) {
                worker = new Worker(m, defaultTTL)
                allWorkers = worker :: allWorkers
            }
            worker.keepAlive
        }
 
        def sweep() = {
            allWorkers.foreach(_.decTTL)
            val (ok, expired) = allWorkers span (_.ttl >= 0)
            allWorkers = ok
            expired
        }
    }
 
    class Worker(val loc:Locator, val defaultTTL:int) {
        var row : Int = 0
        var ttl:Int = 0
        val actor = select(loc.node,loc.name)
        val iterationDepth = 2048
 
        def decTTL = ttl -= 1
 
        def keepAlive() = {
            ttl = defaultTTL
            this
        }
 
        def render(row : Int) {
            this.row = row
            actor ! RenderAction(row, img.getWidth, img.getHeight, iterationDepth)
        }
 
        override def toString = loc.toString
    }
 
    object Mandel {        
 
        object State extends Enumeration {
            val Running, Stopped = Value
        }
        private var state = State.Stopped        
        private var workQueue : Stack[Int] = new Stack()
 
        val draw = actor {
            Actor.loop {
                react {
                    case (row:Int, raster:Raster) =>
                        for(x <- 0 until raster.width) {
                            val shade = raster.line(x) % 256
                            val rgb = new Color(shade,shade,shade).getRGB
                            img.setRGB(x, row, rgb)
                        }
                        drawing.repaint
                }
            }
        }
 
        val a = actor {
            RemoteActor.alive(9999) // Port
            RemoteActor.register('MandelGUI, Actor.self)
            // Sweep non responsive workers every 2sec
            ActorPing.scheduleAtFixedRate(Actor.self, Tick, 0L, 2000L)
 
            Actor.loop {
                react {
                    case "StartWork" =>
                        //print("Start work")
                        WorkerMgmt.foreach(farmWork)
                    case RenderedLine(m:Locator,row:int, raster:Raster) =>
                        draw ! (row, raster) // Get it on the screen
                        if(state == State.Running) farmWork(WorkerMgmt.find(m))
                    case Register(m:Locator) =>
                        println("Register "+m)
                        // Register and assign it work; Immediate load balance
                        farmWork(WorkerMgmt.register(m))
                    case Tick =>
                        for(w <- WorkerMgmt.sweep) {
                            println("Unregister "+w)
                            workQueue.push(w.row)  // push their row
                        }
                    case msg =>
                        println("Unhandled message: "+msg)
                }
            }
        }
 
        def farmWork(worker : Worker) {
            if(workQueue.isEmpty)
                shutdown
            else {
                if(worker != null) worker.render(workQueue.pop)
            }
        }
 
        def shutdown() {
            state = State.Stopped
        }
 
        def startup() {
            if(state == State.Stopped) {                
                workQueue.clear
                for(row <- 0 to img.getHeight-1) workQueue.push(row)
                process
            }
        }
 
        def process() {
            state = State.Running                
            a ! "StartWork"
        }
    }
}
/*
 * ActorPing.scala
 */
 
package mandelbrotdistributed
 
import java.util.concurrent._
import scala.actors._
 
// =============================================
/**
 * Pings an actor every X seconds.
 *
 * Borrowed from Scala TIM sample; which borrows from:
 *
 * Code based on code from the ActorPing class in the /lift/ repository (http://liftweb.net).
 * Copyright:
 *
 * (c) 2007 WorldWide Conferencing, LLC
 * Distributed under an Apache License
 * http://www.apache.org/licenses/LICENSE-2.0
 */
object ActorPing {
 
  def scheduleAtFixedRate(to: AbstractActor, msg: Any, initialDelay: Long, period: Long): ScheduledFuture[T] forSome {type T} = {
    val cmd = new Runnable {
      def run {
        // println("***ActorPing Event***");
        try {
          to ! msg
        }
        catch {
        case t:Throwable => t.printStackTrace
        }
      }
    }
    service.scheduleAtFixedRate(cmd, initialDelay, period, TimeUnit.MILLISECONDS)
  }
 
  private val service = Executors.newSingleThreadScheduledExecutor(threadFactory)
 
  private object threadFactory extends ThreadFactory {
    val threadFactory = Executors.defaultThreadFactory()
    def newThread(r: Runnable) : Thread = {
      val d: Thread = threadFactory.newThread(r)
      d setName "ActorPing"
      d setDaemon true
      d
    }
  }
}

Entire work as NetBeans project : mandelbrotdistributed.zip

blog/scala_distributed_mandelbrot.txt · Last modified: 2009/11/27 17:53 (external edit)