Scala distributed Mandelbrot
My first foray into distributed programming with scala. This worked out pretty well. The whole system automatically load balances and recovers elegantly if a worker is killed half way through a render or indeed if another joins whilst a render is in progress.
It has no bells and whistles with regards to zooming and panning so as keep the code as simple as possible.
Running a worker is as easy as. The IP and port on the worker are the IP address of the GUI, and the LOCAL port which the worker will used for listening. If you want to many worker on the same server give each a unique port number
# export CLASSPATH=.:/usr/local/scala/lib/scala-library.jar # cd MandelbrotDistributed/build/classes # java mandelbrotdistributed/MandelWorker 192.168.1.137 4000 "another worker on the same server" # java mandelbrotdistributed/MandelWorker 192.168.1.137 4001
This is the entire code:
/* * Raster.scala */ package mandelbrotdistributed @serializable class Raster(xlen : Int) { var line = new Array[Int](xlen) def width = line.length }
/* * Complex.scala */ package mandelbrotdistributed class Complex ( val a: Double, val b: Double) { def abs() = Math.sqrt(a*a + b*b) // (a,b)(c,d) = (ac - bd, bc + ad) def *(that: Complex) = new Complex(a*that.a-b*that.b, b*that.a+a*that.b) def +(that: Complex) = new Complex(a+that.a, b+that.b) override def toString = a + " + " + b+"i" }
/* * MandelWorker.scala */ package mandelbrotdistributed import scala.actors.Actor._ import scala.actors.remote._ import scala.actors.remote.RemoteActor.select import java.lang.Math import java.net.InetAddress case class Register(me:Locator) case class RenderedLine(m:Locator,row:Int,raster:Raster) case class RenderAction(row:Int,width:Int,height:Int,level:Int) case class Tick class MandelActor(me : Locator, clientLoc : Locator) { RemoteActor.classLoader = getClass().getClassLoader() actor { println("Worker Ready") RemoteActor.alive(me.node.port) RemoteActor.register(me.name, self) loop { react { case RenderAction(row : Int, width : Int, height : Int, level : Int) => println("Raster row "+row) sender ! RenderedLine(me,row, generate(width, height, row, level)) case msg => println("Unhandled message: " + msg) } } } // Register with the GUI every 5 secs (heartbeat) val client = select(clientLoc.node,clientLoc.name) ActorPing.scheduleAtFixedRate(client, Register(me), 0L, 5000L) def iterate(z:Complex, c:Complex, level:Int, i:Int): (Complex,Int) = if(z.abs > 2 || i > level) (z,i) else iterate(z*z+c, c, level, i+1) def generate(width : Int, height : Int, row : Int, level: Int) : Raster = { val raster = new Raster(width) val y = -1.5 + row*3.0/height for { x0 <- 0 until width} { val x = -2.0 + x0*3.0/width val (z, i) = iterate(new Complex(0,0), new Complex(x,y), level, 0) raster.line(x0) = if (z.abs < 2) 0 else i } raster } } object MandelWorker { def main(args: Array[String]) :Unit = { // arg0: remote IP of where the MandelGUI program is running // arg1: a local port for the mandel worker val host = if (args.length >= 1) args(0) else "127.0.0.1" val port = if (args.length >= 2) args(1).toInt else 9010 val gui = Locator(Node(host,9999), 'MandelGUI) val me = new Locator(Node(InetAddress.getLocalHost.getHostAddress, port), 'MandelWorker) new MandelActor(me, gui) } }
/* * MandelGui.scala */ package mandelbrotdistributed import scala.swing._ import scala.swing.event.ButtonClicked import scala.actors.{Actor,OutputChannel} import scala.actors.Actor._ import scala.actors.remote.{RemoteActor,Locator,Node} import scala.actors.remote.RemoteActor._ import scala.collection.mutable.Stack import java.awt.image.BufferedImage import java.awt.{Graphics,Graphics2D} import java.awt.Color object MandelGui extends SimpleGUIApplication { val img = new BufferedImage(480,480,BufferedImage.TYPE_INT_RGB) val mandel = Mandel val drawing = new Panel { background = Color.black preferredSize = (img.getWidth, img.getHeight) override def paintComponent(g : Graphics) : Unit = { g.asInstanceOf[Graphics2D].drawImage(img,null,0,0) } } def clearDrawing() { var g = img.getGraphics g.setColor(Color.BLACK) g.fillRect(0,0,img.getWidth,img.getHeight) } def top = new MainFrame { title="Mandelbrot" contents = new BorderPanel { val control = new BoxPanel(Orientation.Horizontal) { val start = new Button { text = "Start" } val stop = new Button { text = "Stop" } val continue = new Button { text = "Continue" } contents.append(start,stop,continue) listenTo(start,stop,continue) reactions += { case ButtonClicked(`start`) => clearDrawing Mandel.startup case ButtonClicked(`stop`) => Mandel.shutdown case ButtonClicked(`continue`) => Mandel.process } } drawing import BorderPanel.Position._ layout(control) = North layout(drawing) = Center } } object WorkerMgmt { private var allWorkers : List[Worker] = List() val defaultTTL = 6 //sweeps a worker can survive without a register def foreach(op: Worker => Unit) = allWorkers.foreach(op) def findWorkerForRow(row : int) : Worker = { allWorkers.filter(w=> w.row == row)(0) } def find(m:Locator) : Worker = { if (allWorkers.isEmpty) null else { val list = allWorkers.filter(w=> w.loc == m) if(list.isEmpty) null else list(0) } } def register(m:Locator) : Worker = { var worker = find(m) if (worker == null) { worker = new Worker(m, defaultTTL) allWorkers = worker :: allWorkers } worker.keepAlive } def sweep() = { allWorkers.foreach(_.decTTL) val (ok, expired) = allWorkers span (_.ttl >= 0) allWorkers = ok expired } } class Worker(val loc:Locator, val defaultTTL:int) { var row : Int = 0 var ttl:Int = 0 val actor = select(loc.node,loc.name) val iterationDepth = 2048 def decTTL = ttl -= 1 def keepAlive() = { ttl = defaultTTL this } def render(row : Int) { this.row = row actor ! RenderAction(row, img.getWidth, img.getHeight, iterationDepth) } override def toString = loc.toString } object Mandel { object State extends Enumeration { val Running, Stopped = Value } private var state = State.Stopped private var workQueue : Stack[Int] = new Stack() val draw = actor { Actor.loop { react { case (row:Int, raster:Raster) => for(x <- 0 until raster.width) { val shade = raster.line(x) % 256 val rgb = new Color(shade,shade,shade).getRGB img.setRGB(x, row, rgb) } drawing.repaint } } } val a = actor { RemoteActor.alive(9999) // Port RemoteActor.register('MandelGUI, Actor.self) // Sweep non responsive workers every 2sec ActorPing.scheduleAtFixedRate(Actor.self, Tick, 0L, 2000L) Actor.loop { react { case "StartWork" => //print("Start work") WorkerMgmt.foreach(farmWork) case RenderedLine(m:Locator,row:int, raster:Raster) => draw ! (row, raster) // Get it on the screen if(state == State.Running) farmWork(WorkerMgmt.find(m)) case Register(m:Locator) => println("Register "+m) // Register and assign it work; Immediate load balance farmWork(WorkerMgmt.register(m)) case Tick => for(w <- WorkerMgmt.sweep) { println("Unregister "+w) workQueue.push(w.row) // push their row } case msg => println("Unhandled message: "+msg) } } } def farmWork(worker : Worker) { if(workQueue.isEmpty) shutdown else { if(worker != null) worker.render(workQueue.pop) } } def shutdown() { state = State.Stopped } def startup() { if(state == State.Stopped) { workQueue.clear for(row <- 0 to img.getHeight-1) workQueue.push(row) process } } def process() { state = State.Running a ! "StartWork" } } }
/* * ActorPing.scala */ package mandelbrotdistributed import java.util.concurrent._ import scala.actors._ // ============================================= /** * Pings an actor every X seconds. * * Borrowed from Scala TIM sample; which borrows from: * * Code based on code from the ActorPing class in the /lift/ repository (http://liftweb.net). * Copyright: * * (c) 2007 WorldWide Conferencing, LLC * Distributed under an Apache License * http://www.apache.org/licenses/LICENSE-2.0 */ object ActorPing { def scheduleAtFixedRate(to: AbstractActor, msg: Any, initialDelay: Long, period: Long): ScheduledFuture[T] forSome {type T} = { val cmd = new Runnable { def run { // println("***ActorPing Event***"); try { to ! msg } catch { case t:Throwable => t.printStackTrace } } } service.scheduleAtFixedRate(cmd, initialDelay, period, TimeUnit.MILLISECONDS) } private val service = Executors.newSingleThreadScheduledExecutor(threadFactory) private object threadFactory extends ThreadFactory { val threadFactory = Executors.defaultThreadFactory() def newThread(r: Runnable) : Thread = { val d: Thread = threadFactory.newThread(r) d setName "ActorPing" d setDaemon true d } } }
Entire work as NetBeans project : mandelbrotdistributed.zip