From 3a20670487663cfbadea480de6e0322c3055afcf Mon Sep 17 00:00:00 2001 From: Michael Bien <mbien@fh-landshut.de> Date: Wed, 4 May 2011 16:26:46 +0200 Subject: initial import of CLCommandQueuePool and CLTask. --- .../opencl/util/concurrent/CLMultiContextTest.java | 54 ++++++++++++++++++++++ 1 file changed, 54 insertions(+) create mode 100644 test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java (limited to 'test/com/jogamp/opencl/util') diff --git a/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java b/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java new file mode 100644 index 00000000..8e96dafa --- /dev/null +++ b/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java @@ -0,0 +1,54 @@ +/* + * Created on Tuesday, May 03 2011 + */ +package com.jogamp.opencl.util.concurrent; + +import com.jogamp.opencl.CLContext; +import com.jogamp.opencl.CLDevice; +import com.jogamp.opencl.CLPlatform; +import org.junit.Rule; +import org.junit.rules.MethodRule; +import org.junit.rules.Timeout; +import com.jogamp.opencl.util.CLMultiContext; +import java.util.List; +import org.junit.Test; + +import static org.junit.Assert.*; +import static java.lang.System.*; + +/** + * + * @author Michael Bien + */ +public class CLMultiContextTest { + + @Rule + public MethodRule methodTimeout= new Timeout(10000); + + @Test + public void createMultiContextTest() { + + CLMultiContext mc = CLMultiContext.create(CLPlatform.listCLPlatforms()); + + try{ + List<CLContext> contexts = mc.getContexts(); + List<CLDevice> devices = mc.getDevices(); + + assertFalse(contexts.isEmpty()); + assertFalse(devices.isEmpty()); + + for (CLContext context : contexts) { + out.println(context); + } + for (CLDevice device : devices) { + out.println(device); + } + + }finally{ + mc.release(); + } + + } + + +} -- cgit v1.2.3 From 19cc9195c73002f84c153a1ffc60f00408e1176e Mon Sep 17 00:00:00 2001 From: Michael Bien <mbien@fh-landshut.de> Date: Sat, 7 May 2011 02:11:44 +0200 Subject: introduced CLQueueContext and its factory - WIP. --- .../opencl/util/concurrent/CLCommandQueuePool.java | 82 +++++++++++++--------- .../opencl/util/concurrent/CLQueueContext.java | 52 ++++++++++++++ .../util/concurrent/CLQueueContextFactory.java | 46 ++++++++++++ src/com/jogamp/opencl/util/concurrent/CLTask.java | 5 +- .../opencl/util/concurrent/CLMultiContextTest.java | 35 ++++++++- 5 files changed, 180 insertions(+), 40 deletions(-) create mode 100644 src/com/jogamp/opencl/util/concurrent/CLQueueContext.java create mode 100644 src/com/jogamp/opencl/util/concurrent/CLQueueContextFactory.java (limited to 'test/com/jogamp/opencl/util') diff --git a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java index 92828e95..ee6dc86b 100644 --- a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java +++ b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java @@ -18,7 +18,7 @@ import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; /** - * A multithreaded pool of OpenCL command queues. + * A multithreaded fixed size pool of OpenCL command queues. * It serves as a multiplexer distributing tasks over N queues. * The usage of this pool is similar to {@link ExecutorService} but it uses {@link CLTask}s * instead of {@link Callable}s. @@ -26,29 +26,37 @@ import java.util.concurrent.ThreadFactory; */ public class CLCommandQueuePool implements CLResource { - private final List<CLCommandQueue> queues; - private final ExecutorService excecutor; + private final List<CLQueueContext> contexts; + private ExecutorService excecutor; private FinishAction finishAction = FinishAction.DO_NOTHING; - private CLCommandQueuePool(Collection<CLCommandQueue> queues) { - this.queues = Collections.unmodifiableList(new ArrayList<CLCommandQueue>(queues)); - this.excecutor = Executors.newFixedThreadPool(queues.size(), new QueueThreadFactory(this.queues)); + private CLCommandQueuePool(Collection<CLQueueContext> contexts) { + this.contexts = Collections.unmodifiableList(new ArrayList<CLQueueContext>(contexts)); + initExecutor(); } - public static CLCommandQueuePool create(CLMultiContext mc, CLCommandQueue.Mode... modes) { - return create(mc.getDevices(), modes); + private void initExecutor() { + this.excecutor = Executors.newFixedThreadPool(contexts.size(), new QueueThreadFactory(contexts)); } - public static CLCommandQueuePool create(Collection<CLDevice> devices, CLCommandQueue.Mode... modes) { + public static CLCommandQueuePool create(CLQueueContextFactory factory, CLMultiContext mc, CLCommandQueue.Mode... modes) { + return create(factory, mc.getDevices(), modes); + } + + public static CLCommandQueuePool create(CLQueueContextFactory factory, Collection<CLDevice> devices, CLCommandQueue.Mode... modes) { List<CLCommandQueue> queues = new ArrayList<CLCommandQueue>(devices.size()); for (CLDevice device : devices) { queues.add(device.createCommandQueue(modes)); } - return create(queues); + return create(factory, queues); } - public static CLCommandQueuePool create(Collection<CLCommandQueue> queues) { - return new CLCommandQueuePool(queues); + public static CLCommandQueuePool create(CLQueueContextFactory factory, Collection<CLCommandQueue> queues) { + List<CLQueueContext> contexts = new ArrayList<CLQueueContext>(queues.size()); + for (CLCommandQueue queue : queues) { + contexts.add(factory.setup(queue, null)); + } + return new CLCommandQueuePool(contexts); } public <R> Future<R> submit(CLTask<R> task) { @@ -66,18 +74,18 @@ public class CLCommandQueuePool implements CLResource { /** * Calls {@link CLCommandQueue#flush()} on all queues. */ - public void flush() { - for (CLCommandQueue queue : queues) { - queue.flush(); + public void flushQueues() { + for (CLQueueContext context : contexts) { + context.queue.flush(); } } /** * Calls {@link CLCommandQueue#finish()} on all queues. */ - public void finish() { - for (CLCommandQueue queue : queues) { - queue.finish(); + public void finishQueues() { + for (CLQueueContext context : contexts) { + context.queue.finish(); } } @@ -85,16 +93,20 @@ public class CLCommandQueuePool implements CLResource { * Releases all queues. */ public void release() { - for (CLCommandQueue queue : queues) { - queue.finish().release(); - } excecutor.shutdown(); + for (CLQueueContext context : contexts) { + context.queue.finish().release(); + } } /** * Returns the command queues used in this pool. */ public List<CLCommandQueue> getQueues() { + List<CLCommandQueue> queues = new ArrayList<CLCommandQueue>(contexts.size()); + for (CLQueueContext context : contexts) { + queues.add(context.queue); + } return queues; } @@ -102,7 +114,7 @@ public class CLCommandQueuePool implements CLResource { * Returns the size of this pool (number of command queues). */ public int getSize() { - return queues.size(); + return contexts.size(); } public FinishAction getFinishAction() { @@ -115,31 +127,31 @@ public class CLCommandQueuePool implements CLResource { @Override public String toString() { - return getClass().getSimpleName()+" [queues: "+queues.size()+" on finish: "+finishAction+"]"; + return getClass().getSimpleName()+" [queues: "+contexts.size()+" on finish: "+finishAction+"]"; } private static class QueueThreadFactory implements ThreadFactory { - private final List<CLCommandQueue> queues; + private final List<CLQueueContext> context; private int index; - private QueueThreadFactory(List<CLCommandQueue> queues) { - this.queues = queues; + private QueueThreadFactory(List<CLQueueContext> queues) { + this.context = queues; this.index = 0; } public synchronized Thread newThread(Runnable r) { - CLCommandQueue queue = queues.get(index); + CLQueueContext queue = context.get(index); return new QueueThread(queue, index++); } } private static class QueueThread extends Thread { - private final CLCommandQueue queue; - public QueueThread(CLCommandQueue queue, int index) { - super("queue-worker-thread-"+index+"["+queue+"]"); - this.queue = queue; + private final CLQueueContext context; + public QueueThread(CLQueueContext context, int index) { + super("queue-worker-thread-"+index+"["+context+"]"); + this.context = context; this.setDaemon(true); } } @@ -155,12 +167,12 @@ public class CLCommandQueuePool implements CLResource { } public T call() throws Exception { - CLCommandQueue queue = ((QueueThread)Thread.currentThread()).queue; - T result = task.run(queue); + CLQueueContext context = ((QueueThread)Thread.currentThread()).context; + T result = task.run(context); if(mode.equals(FinishAction.FLUSH)) { - queue.flush(); + context.queue.flush(); }else if(mode.equals(FinishAction.FINISH)) { - queue.finish(); + context.queue.finish(); } return result; } diff --git a/src/com/jogamp/opencl/util/concurrent/CLQueueContext.java b/src/com/jogamp/opencl/util/concurrent/CLQueueContext.java new file mode 100644 index 00000000..fef0047d --- /dev/null +++ b/src/com/jogamp/opencl/util/concurrent/CLQueueContext.java @@ -0,0 +1,52 @@ +/* + * Created on Friday, May 06 2011 21:02 + */ +package com.jogamp.opencl.util.concurrent; + +import com.jogamp.opencl.CLCommandQueue; +import com.jogamp.opencl.CLKernel; +import com.jogamp.opencl.CLProgram; +import com.jogamp.opencl.CLResource; +import java.util.Map; + +/** + * @author Michael Bien + */ +public abstract class CLQueueContext implements CLResource { + + public final CLCommandQueue queue; + + public CLQueueContext(CLCommandQueue queue) { + this.queue = queue; + } + + public CLCommandQueue getQueue() { + return queue; + } + + public static class CLSimpleQueueContext extends CLQueueContext { + + public final CLProgram program; + public final Map<String, CLKernel> kernels; + + public CLSimpleQueueContext(CLCommandQueue queue, CLProgram program) { + super(queue); + this.program = program; + this.kernels = program.createCLKernels(); + } + + public Map<String, CLKernel> getKernels() { + return kernels; + } + + public CLProgram getProgram() { + return program; + } + + public void release() { + throw new UnsupportedOperationException("Not supported yet."); + } + + } + +} diff --git a/src/com/jogamp/opencl/util/concurrent/CLQueueContextFactory.java b/src/com/jogamp/opencl/util/concurrent/CLQueueContextFactory.java new file mode 100644 index 00000000..64fdfbcd --- /dev/null +++ b/src/com/jogamp/opencl/util/concurrent/CLQueueContextFactory.java @@ -0,0 +1,46 @@ +/* + * Created onSaturday, May 07 2011 00:40 + */ +package com.jogamp.opencl.util.concurrent; + +import com.jogamp.opencl.CLCommandQueue; +import com.jogamp.opencl.CLProgram; + +/** + * + * @author Michael Bien + */ +public abstract class CLQueueContextFactory<C extends CLQueueContext> { + + /** + * Creates a new queue context for the given queue. + * @param old the old context or null. + */ + public abstract C setup(CLCommandQueue queue, CLQueueContext old); + + + /** + * Creates a simple context factory producing single program contexts. + * @param source sourcecode of a OpenCL program. + */ + public static CLSimpleContextFactory createSimple(String source) { + return new CLSimpleContextFactory(source); + } + + public static class CLSimpleContextFactory extends CLQueueContextFactory<CLQueueContext.CLSimpleQueueContext> { + + private final String source; + + public CLSimpleContextFactory(String source) { + this.source = source; + } + + @Override + public CLQueueContext.CLSimpleQueueContext setup(CLCommandQueue queue, CLQueueContext old) { + CLProgram program = queue.getContext().createProgram(source).build(queue.getDevice()); + return new CLQueueContext.CLSimpleQueueContext(queue, program); + } + + } + +} diff --git a/src/com/jogamp/opencl/util/concurrent/CLTask.java b/src/com/jogamp/opencl/util/concurrent/CLTask.java index ebecb936..ff0f7614 100644 --- a/src/com/jogamp/opencl/util/concurrent/CLTask.java +++ b/src/com/jogamp/opencl/util/concurrent/CLTask.java @@ -3,7 +3,6 @@ */ package com.jogamp.opencl.util.concurrent; -import com.jogamp.opencl.CLCommandQueue; /** * A task executed on a command queue. @@ -12,8 +11,8 @@ import com.jogamp.opencl.CLCommandQueue; public interface CLTask<R> { /** - * Runs the task on a queue and returns its result. + * Runs the task on a queue and returns a result. */ - R run(CLCommandQueue queue); + R run(CLQueueContext context); } diff --git a/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java b/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java index 8e96dafa..f076324a 100644 --- a/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java +++ b/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java @@ -6,6 +6,7 @@ package com.jogamp.opencl.util.concurrent; import com.jogamp.opencl.CLContext; import com.jogamp.opencl.CLDevice; import com.jogamp.opencl.CLPlatform; +import com.jogamp.opencl.util.concurrent.CLQueueContextFactory.CLSimpleContextFactory; import org.junit.Rule; import org.junit.rules.MethodRule; import org.junit.rules.Timeout; @@ -22,8 +23,8 @@ import static java.lang.System.*; */ public class CLMultiContextTest { - @Rule - public MethodRule methodTimeout= new Timeout(10000); +// @Rule +// public MethodRule methodTimeout= new Timeout(10000); @Test public void createMultiContextTest() { @@ -50,5 +51,35 @@ public class CLMultiContextTest { } + private final static String programSource = + " // OpenCL Kernel Function for element by element vector addition \n" + + "kernel void vectorAdd(global const int* a, global const int* b, global int* c, int iNumElements) { \n" + + " // get index in global data array \n" + + " int iGID = get_global_id(0); \n" + + " // bound check (equivalent to the limit on a 'for' loop for standard/serial C code \n" + + " if (iGID >= iNumElements) { \n" + + " return; \n" + + " } \n" + + " // add the vector elements \n" + + " c[iGID] = a[iGID] + b[iGID]; \n" + + "} \n"; + + @Test + public void commandQueuePoolTest() { + + CLMultiContext mc = CLMultiContext.create(CLPlatform.listCLPlatforms()); + + try { + + CLSimpleContextFactory factory = CLQueueContextFactory.createSimple(programSource); + CLCommandQueuePool pool = CLCommandQueuePool.create(factory, mc); + + assertTrue(pool.getSize() > 0); + + pool.release(); + }finally{ + mc.release(); + } + } } -- cgit v1.2.3 From dedded707fc70fda3e40cf963d208202f8d6c42b Mon Sep 17 00:00:00 2001 From: Michael Bien <mbien@fh-landshut.de> Date: Sun, 8 May 2011 19:46:28 +0200 Subject: added context switching functionality. --- .../opencl/util/concurrent/CLCommandQueuePool.java | 49 ++++++++++++++++++---- .../opencl/util/concurrent/CLQueueContext.java | 2 +- .../opencl/util/concurrent/CLMultiContextTest.java | 12 +++--- 3 files changed, 46 insertions(+), 17 deletions(-) (limited to 'test/com/jogamp/opencl/util') diff --git a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java index ee6dc86b..b80f09e6 100644 --- a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java +++ b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java @@ -9,7 +9,6 @@ import com.jogamp.opencl.CLResource; import com.jogamp.opencl.util.CLMultiContext; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; @@ -26,15 +25,32 @@ import java.util.concurrent.ThreadFactory; */ public class CLCommandQueuePool implements CLResource { - private final List<CLQueueContext> contexts; + private List<CLQueueContext> contexts; private ExecutorService excecutor; private FinishAction finishAction = FinishAction.DO_NOTHING; - private CLCommandQueuePool(Collection<CLQueueContext> contexts) { - this.contexts = Collections.unmodifiableList(new ArrayList<CLQueueContext>(contexts)); + private CLCommandQueuePool(CLQueueContextFactory factory, Collection<CLCommandQueue> queues) { + this.contexts = initContexts(queues, factory); initExecutor(); } + private List<CLQueueContext> initContexts(Collection<CLCommandQueue> queues, CLQueueContextFactory factory) { + List<CLQueueContext> newContexts = new ArrayList<CLQueueContext>(queues.size()); + + int index = 0; + for (CLCommandQueue queue : queues) { + + CLQueueContext old = null; + if(this.contexts != null && !this.contexts.isEmpty()) { + old = this.contexts.get(index++); + old.release(); + } + + newContexts.add(factory.setup(queue, old)); + } + return newContexts; + } + private void initExecutor() { this.excecutor = Executors.newFixedThreadPool(contexts.size(), new QueueThreadFactory(contexts)); } @@ -52,11 +68,7 @@ public class CLCommandQueuePool implements CLResource { } public static CLCommandQueuePool create(CLQueueContextFactory factory, Collection<CLCommandQueue> queues) { - List<CLQueueContext> contexts = new ArrayList<CLQueueContext>(queues.size()); - for (CLCommandQueue queue : queues) { - contexts.add(factory.setup(queue, null)); - } - return new CLCommandQueuePool(contexts); + return new CLCommandQueuePool(factory, queues); } public <R> Future<R> submit(CLTask<R> task) { @@ -70,6 +82,20 @@ public class CLCommandQueuePool implements CLResource { } return excecutor.invokeAll(wrapper); } + + /** + * Switches the context of all queues - this operation can be expensive. + * Blocks until all tasks finish and sets up a new context for all queues. + */ + public <C extends CLQueueContext> CLCommandQueuePool switchContext(CLQueueContextFactory<C> factory) { + + excecutor.shutdown(); + finishQueues(); // just to be sure + + contexts = initContexts(getQueues(), factory); + initExecutor(); + return this; + } /** * Calls {@link CLCommandQueue#flush()} on all queues. @@ -96,6 +122,7 @@ public class CLCommandQueuePool implements CLResource { excecutor.shutdown(); for (CLQueueContext context : contexts) { context.queue.finish().release(); + context.release(); } } @@ -121,6 +148,10 @@ public class CLCommandQueuePool implements CLResource { return finishAction; } + /** + * Sets the action which is run after every completed task. + * This is mainly intended for debugging, default value is {@link FinishAction#DO_NOTHING}. + */ public void setFinishAction(FinishAction action) { this.finishAction = action; } diff --git a/src/com/jogamp/opencl/util/concurrent/CLQueueContext.java b/src/com/jogamp/opencl/util/concurrent/CLQueueContext.java index fef0047d..3956f93d 100644 --- a/src/com/jogamp/opencl/util/concurrent/CLQueueContext.java +++ b/src/com/jogamp/opencl/util/concurrent/CLQueueContext.java @@ -44,7 +44,7 @@ public abstract class CLQueueContext implements CLResource { } public void release() { - throw new UnsupportedOperationException("Not supported yet."); + program.release(); } } diff --git a/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java b/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java index f076324a..7a1ed7aa 100644 --- a/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java +++ b/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java @@ -23,8 +23,8 @@ import static java.lang.System.*; */ public class CLMultiContextTest { -// @Rule -// public MethodRule methodTimeout= new Timeout(10000); + @Rule + public MethodRule methodTimeout= new Timeout(10000); @Test public void createMultiContextTest() { @@ -52,15 +52,11 @@ public class CLMultiContextTest { } private final static String programSource = - " // OpenCL Kernel Function for element by element vector addition \n" - + "kernel void vectorAdd(global const int* a, global const int* b, global int* c, int iNumElements) { \n" - + " // get index in global data array \n" + "kernel void vectorAdd(global const int* a, global const int* b, global int* c, int iNumElements) { \n" + " int iGID = get_global_id(0); \n" - + " // bound check (equivalent to the limit on a 'for' loop for standard/serial C code \n" + " if (iGID >= iNumElements) { \n" + " return; \n" + " } \n" - + " // add the vector elements \n" + " c[iGID] = a[iGID] + b[iGID]; \n" + "} \n"; @@ -75,6 +71,8 @@ public class CLMultiContextTest { CLCommandQueuePool pool = CLCommandQueuePool.create(factory, mc); assertTrue(pool.getSize() > 0); + + pool.switchContext(factory); pool.release(); }finally{ -- cgit v1.2.3 From c59bc50229181ab9cb0e5012d7bb5caf2faa781f Mon Sep 17 00:00:00 2001 From: Michael Bien <mbien@fh-landshut.de> Date: Mon, 9 May 2011 03:00:55 +0200 Subject: concurrent utils bugfixes and improvements. - more utility methods - generics fixes - basic junit test for CLCommandQueuePool - javadoc and argument validation --- src/com/jogamp/opencl/util/CLMultiContext.java | 11 +++ .../opencl/util/concurrent/CLCommandQueuePool.java | 69 +++++++++---- .../opencl/util/concurrent/CLQueueContext.java | 9 ++ src/com/jogamp/opencl/util/concurrent/CLTask.java | 4 +- .../opencl/util/concurrent/CLMultiContextTest.java | 107 +++++++++++++++++++-- 5 files changed, 168 insertions(+), 32 deletions(-) (limited to 'test/com/jogamp/opencl/util') diff --git a/src/com/jogamp/opencl/util/CLMultiContext.java b/src/com/jogamp/opencl/util/CLMultiContext.java index f588fcef..f74c0a35 100644 --- a/src/com/jogamp/opencl/util/CLMultiContext.java +++ b/src/com/jogamp/opencl/util/CLMultiContext.java @@ -41,6 +41,13 @@ public class CLMultiContext implements CLResource { * Creates a multi context with all devices of the specified platforms and types. */ public static CLMultiContext create(CLPlatform[] platforms, CLDevice.Type... types) { + + if(platforms == null) { + throw new NullPointerException("platform list was null"); + }else if(platforms.length == 0) { + throw new IllegalArgumentException("platform list was empty"); + } + List<CLDevice> devices = new ArrayList<CLDevice>(); for (CLPlatform platform : platforms) { devices.addAll(asList(platform.listCLDevices(types))); @@ -54,6 +61,10 @@ public class CLMultiContext implements CLResource { */ public static CLMultiContext create(Collection<CLDevice> devices) { + if(devices.isEmpty()) { + throw new IllegalArgumentException("device list was empty"); + } + Map<CLPlatform, List<CLDevice>> platformDevicesMap = filterPlatformConflicts(devices); // create contexts diff --git a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java index b80f09e6..a6bbe4d0 100644 --- a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java +++ b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java @@ -15,6 +15,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; /** * A multithreaded fixed size pool of OpenCL command queues. @@ -23,7 +24,7 @@ import java.util.concurrent.ThreadFactory; * instead of {@link Callable}s. * @author Michael Bien */ -public class CLCommandQueuePool implements CLResource { +public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource { private List<CLQueueContext> contexts; private ExecutorService excecutor; @@ -55,11 +56,11 @@ public class CLCommandQueuePool implements CLResource { this.excecutor = Executors.newFixedThreadPool(contexts.size(), new QueueThreadFactory(contexts)); } - public static CLCommandQueuePool create(CLQueueContextFactory factory, CLMultiContext mc, CLCommandQueue.Mode... modes) { + public static <C extends CLQueueContext> CLCommandQueuePool<C> create(CLQueueContextFactory<C> factory, CLMultiContext mc, CLCommandQueue.Mode... modes) { return create(factory, mc.getDevices(), modes); } - public static CLCommandQueuePool create(CLQueueContextFactory factory, Collection<CLDevice> devices, CLCommandQueue.Mode... modes) { + public static <C extends CLQueueContext> CLCommandQueuePool<C> create(CLQueueContextFactory<C> factory, Collection<CLDevice> devices, CLCommandQueue.Mode... modes) { List<CLCommandQueue> queues = new ArrayList<CLCommandQueue>(devices.size()); for (CLDevice device : devices) { queues.add(device.createCommandQueue(modes)); @@ -67,21 +68,43 @@ public class CLCommandQueuePool implements CLResource { return create(factory, queues); } - public static CLCommandQueuePool create(CLQueueContextFactory factory, Collection<CLCommandQueue> queues) { + public static <C extends CLQueueContext> CLCommandQueuePool create(CLQueueContextFactory<C> factory, Collection<CLCommandQueue> queues) { return new CLCommandQueuePool(factory, queues); } - public <R> Future<R> submit(CLTask<R> task) { + /** + * @see ExecutorService#submit(java.util.concurrent.Callable) + */ + public <R> Future<R> submit(CLTask<? extends C, R> task) { return excecutor.submit(new TaskWrapper(task, finishAction)); } - public <R> List<Future<R>> invokeAll(Collection<CLTask<R>> tasks) throws InterruptedException { - List<TaskWrapper<R>> wrapper = new ArrayList<TaskWrapper<R>>(tasks.size()); - for (CLTask<R> task : tasks) { - wrapper.add(new TaskWrapper<R>(task, finishAction)); - } + /** + * @see ExecutorService#invokeAll(java.util.Collection) + */ + public <R> List<Future<R>> invokeAll(Collection<? extends CLTask<? super C, R>> tasks) throws InterruptedException { + List<TaskWrapper<C, R>> wrapper = wrapTasks(tasks); return excecutor.invokeAll(wrapper); } + + /** + * @see ExecutorService#invokeAll(java.util.Collection, long, java.util.concurrent.TimeUnit) + */ + public <R> List<Future<R>> invokeAll(Collection<? extends CLTask<? super C, R>> tasks, long timeout, TimeUnit unit) throws InterruptedException { + List<TaskWrapper<C, R>> wrapper = wrapTasks(tasks); + return excecutor.invokeAll(wrapper, timeout, unit); + } + + private <R> List<TaskWrapper<C, R>> wrapTasks(Collection<? extends CLTask<? super C, R>> tasks) { + List<TaskWrapper<C, R>> wrapper = new ArrayList<TaskWrapper<C, R>>(tasks.size()); + for (CLTask<? super C, R> task : tasks) { + if(task == null) { + throw new NullPointerException("at least one task was null"); + } + wrapper.add(new TaskWrapper<C, R>(task, finishAction)); + } + return wrapper; + } /** * Switches the context of all queues - this operation can be expensive. @@ -171,35 +194,41 @@ public class CLCommandQueuePool implements CLResource { this.index = 0; } - public synchronized Thread newThread(Runnable r) { + public synchronized Thread newThread(Runnable runnable) { + + SecurityManager sm = System.getSecurityManager(); + ThreadGroup group = (sm != null)? sm.getThreadGroup() : Thread.currentThread().getThreadGroup(); + CLQueueContext queue = context.get(index); - return new QueueThread(queue, index++); + QueueThread thread = new QueueThread(group, runnable, queue, index++); + thread.setDaemon(true); + + return thread; } } private static class QueueThread extends Thread { private final CLQueueContext context; - public QueueThread(CLQueueContext context, int index) { - super("queue-worker-thread-"+index+"["+context+"]"); + public QueueThread(ThreadGroup group, Runnable runnable, CLQueueContext context, int index) { + super(group, runnable, "queue-worker-thread-"+index+"["+context+"]"); this.context = context; - this.setDaemon(true); } } - private static class TaskWrapper<T> implements Callable<T> { + private static class TaskWrapper<C extends CLQueueContext, R> implements Callable<R> { - private final CLTask<T> task; + private final CLTask<? super C, R> task; private final FinishAction mode; - public TaskWrapper(CLTask<T> task, FinishAction mode) { + public TaskWrapper(CLTask<? super C, R> task, FinishAction mode) { this.task = task; this.mode = mode; } - public T call() throws Exception { + public R call() throws Exception { CLQueueContext context = ((QueueThread)Thread.currentThread()).context; - T result = task.run(context); + R result = task.execute((C)context); if(mode.equals(FinishAction.FLUSH)) { context.queue.flush(); }else if(mode.equals(FinishAction.FINISH)) { diff --git a/src/com/jogamp/opencl/util/concurrent/CLQueueContext.java b/src/com/jogamp/opencl/util/concurrent/CLQueueContext.java index 3956f93d..11b86889 100644 --- a/src/com/jogamp/opencl/util/concurrent/CLQueueContext.java +++ b/src/com/jogamp/opencl/util/concurrent/CLQueueContext.java @@ -4,6 +4,7 @@ package com.jogamp.opencl.util.concurrent; import com.jogamp.opencl.CLCommandQueue; +import com.jogamp.opencl.CLContext; import com.jogamp.opencl.CLKernel; import com.jogamp.opencl.CLProgram; import com.jogamp.opencl.CLResource; @@ -24,6 +25,10 @@ public abstract class CLQueueContext implements CLResource { return queue; } + public CLContext getCLContext() { + return queue.getContext(); + } + public static class CLSimpleQueueContext extends CLQueueContext { public final CLProgram program; @@ -39,6 +44,10 @@ public abstract class CLQueueContext implements CLResource { return kernels; } + public CLKernel getKernel(String name) { + return kernels.get(name); + } + public CLProgram getProgram() { return program; } diff --git a/src/com/jogamp/opencl/util/concurrent/CLTask.java b/src/com/jogamp/opencl/util/concurrent/CLTask.java index ff0f7614..0cfd24a5 100644 --- a/src/com/jogamp/opencl/util/concurrent/CLTask.java +++ b/src/com/jogamp/opencl/util/concurrent/CLTask.java @@ -8,11 +8,11 @@ package com.jogamp.opencl.util.concurrent; * A task executed on a command queue. * @author Michael Bien */ -public interface CLTask<R> { +public interface CLTask<C extends CLQueueContext, R> { /** * Runs the task on a queue and returns a result. */ - R run(CLQueueContext context); + R execute(C context); } diff --git a/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java b/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java index 7a1ed7aa..e5bcb1c5 100644 --- a/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java +++ b/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java @@ -3,14 +3,24 @@ */ package com.jogamp.opencl.util.concurrent; +import com.jogamp.common.nio.Buffers; +import com.jogamp.opencl.CLBuffer; +import com.jogamp.opencl.CLCommandQueue; import com.jogamp.opencl.CLContext; import com.jogamp.opencl.CLDevice; +import com.jogamp.opencl.CLKernel; import com.jogamp.opencl.CLPlatform; +import com.jogamp.opencl.util.concurrent.CLQueueContext.CLSimpleQueueContext; import com.jogamp.opencl.util.concurrent.CLQueueContextFactory.CLSimpleContextFactory; +import java.nio.IntBuffer; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import org.junit.Rule; import org.junit.rules.MethodRule; import org.junit.rules.Timeout; import com.jogamp.opencl.util.CLMultiContext; +import java.nio.Buffer; +import java.util.ArrayList; import java.util.List; import org.junit.Test; @@ -52,27 +62,97 @@ public class CLMultiContextTest { } private final static String programSource = - "kernel void vectorAdd(global const int* a, global const int* b, global int* c, int iNumElements) { \n" - + " int iGID = get_global_id(0); \n" - + " if (iGID >= iNumElements) { \n" - + " return; \n" - + " } \n" - + " c[iGID] = a[iGID] + b[iGID]; \n" - + "} \n"; + "kernel void increment(global int* array, int numElements) { \n" + + " int index = get_global_id(0); \n" + + " if (index >= numElements) { \n" + + " return; \n" + + " } \n" + + " array[index]++; \n" + + "} \n"; + + private final class CLTestTask implements CLTask<CLSimpleQueueContext, Buffer> { + + private final Buffer data; + + public CLTestTask(Buffer buffer) { + this.data = buffer; + } + + public Buffer execute(CLSimpleQueueContext qc) { + + CLCommandQueue queue = qc.getQueue(); + CLContext context = qc.getCLContext(); + CLKernel kernel = qc.getKernel("increment"); + + CLBuffer<Buffer> buffer = null; + try{ + buffer = context.createBuffer(data); + int gws = buffer.getCLCapacity(); + + kernel.putArg(buffer).putArg(gws).rewind(); + + queue.putWriteBuffer(buffer, true); + queue.put1DRangeKernel(kernel, 0, gws, 0); + queue.putReadBuffer(buffer, true); + }finally{ + if(buffer != null) { + buffer.release(); + } + } + + return data; + } + + } @Test - public void commandQueuePoolTest() { + public void commandQueuePoolTest() throws InterruptedException, ExecutionException { CLMultiContext mc = CLMultiContext.create(CLPlatform.listCLPlatforms()); try { CLSimpleContextFactory factory = CLQueueContextFactory.createSimple(programSource); - CLCommandQueuePool pool = CLCommandQueuePool.create(factory, mc); + CLCommandQueuePool<CLSimpleQueueContext> pool = CLCommandQueuePool.create(factory, mc); assertTrue(pool.getSize() > 0); + + final int slice = 64; + final int tasksPerQueue = 10; + final int taskCount = pool.getSize() * tasksPerQueue; + + IntBuffer data = Buffers.newDirectIntBuffer(slice*taskCount); + + List<CLTestTask> tasks = new ArrayList<CLTestTask>(taskCount); + + for (int i = 0; i < taskCount; i++) { + IntBuffer subBuffer = Buffers.slice(data, i*slice, slice); + assertEquals(slice, subBuffer.capacity()); + tasks.add(new CLTestTask(subBuffer)); + } + + out.println("invoking "+tasks.size()+" tasks on "+pool.getSize()+" queues"); + + pool.invokeAll(tasks); + checkBuffer(1, data); + + + for (CLTestTask task : tasks) { + pool.submit(task).get(); + } + checkBuffer(2, data); + + + List<Future<Buffer>> futures = new ArrayList<Future<Buffer>>(taskCount); + for (CLTestTask task : tasks) { + futures.add(pool.submit(task)); + } + for (Future<Buffer> future : futures) { + future.get(); + } + checkBuffer(3, data); - pool.switchContext(factory); +// pool.switchContext(factory); pool.release(); }finally{ @@ -80,4 +160,11 @@ public class CLMultiContextTest { } } + private void checkBuffer(int expected, IntBuffer data) { + while(data.hasRemaining()) { + assertEquals(expected, data.get()); + } + data.rewind(); + } + } -- cgit v1.2.3 From 1c38b7ef96910260b64843214279ac4683005609 Mon Sep 17 00:00:00 2001 From: Michael Bien <mbien@fh-landshut.de> Date: Mon, 9 May 2011 17:30:19 +0200 Subject: added submitAll() utility method junit test now covering queue contexts switching improved javadoc. --- .../opencl/util/concurrent/CLCommandQueuePool.java | 26 +++++++++++++---- .../opencl/util/concurrent/CLQueueContext.java | 11 +++++++ .../util/concurrent/CLQueueContextFactory.java | 13 ++++++--- .../opencl/util/concurrent/CLMultiContextTest.java | 34 ++++++++++++---------- 4 files changed, 59 insertions(+), 25 deletions(-) (limited to 'test/com/jogamp/opencl/util') diff --git a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java index a6bbe4d0..9ea960ae 100644 --- a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java +++ b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java @@ -18,10 +18,10 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; /** - * A multithreaded fixed size pool of OpenCL command queues. - * It serves as a multiplexer distributing tasks over N queues. + * A multithreaded, fixed size pool of OpenCL command queues. + * It serves as a multiplexer distributing tasks over N queues usually run on N devices. * The usage of this pool is similar to {@link ExecutorService} but it uses {@link CLTask}s - * instead of {@link Callable}s. + * instead of {@link Callable}s and provides a per-queue context for resource sharing across all tasks of one queue. * @author Michael Bien */ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource { @@ -73,13 +73,27 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource } /** + * Submits this task to the pool for execution returning its {@link Future}. * @see ExecutorService#submit(java.util.concurrent.Callable) */ - public <R> Future<R> submit(CLTask<? extends C, R> task) { + public <R> Future<R> submit(CLTask<? super C, R> task) { return excecutor.submit(new TaskWrapper(task, finishAction)); } /** + * Submits all tasks to the pool for execution and returns their {@link Future}. + * Calls {@link #submit(com.jogamp.opencl.util.concurrent.CLTask)} for every task. + */ + public <R> List<Future<R>> submitAll(Collection<? extends CLTask<? super C, R>> tasks) { + List<Future<R>> futures = new ArrayList<Future<R>>(tasks.size()); + for (CLTask<? super C, R> task : tasks) { + futures.add(submit(task)); + } + return futures; + } + + /** + * Submits all tasks to the pool for immediate execution (blocking) and returns their {@link Future} holding the result. * @see ExecutorService#invokeAll(java.util.Collection) */ public <R> List<Future<R>> invokeAll(Collection<? extends CLTask<? super C, R>> tasks) throws InterruptedException { @@ -88,6 +102,7 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource } /** + * Submits all tasks to the pool for immediate execution (blocking) and returns their {@link Future} holding the result. * @see ExecutorService#invokeAll(java.util.Collection, long, java.util.concurrent.TimeUnit) */ public <R> List<Future<R>> invokeAll(Collection<? extends CLTask<? super C, R>> tasks, long timeout, TimeUnit unit) throws InterruptedException { @@ -109,6 +124,7 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource /** * Switches the context of all queues - this operation can be expensive. * Blocks until all tasks finish and sets up a new context for all queues. + * @return this */ public <C extends CLQueueContext> CLCommandQueuePool switchContext(CLQueueContextFactory<C> factory) { @@ -197,7 +213,7 @@ public class CLCommandQueuePool<C extends CLQueueContext> implements CLResource public synchronized Thread newThread(Runnable runnable) { SecurityManager sm = System.getSecurityManager(); - ThreadGroup group = (sm != null)? sm.getThreadGroup() : Thread.currentThread().getThreadGroup(); + ThreadGroup group = (sm != null) ? sm.getThreadGroup() : Thread.currentThread().getThreadGroup(); CLQueueContext queue = context.get(index); QueueThread thread = new QueueThread(group, runnable, queue, index++); diff --git a/src/com/jogamp/opencl/util/concurrent/CLQueueContext.java b/src/com/jogamp/opencl/util/concurrent/CLQueueContext.java index 11b86889..3f89ad0e 100644 --- a/src/com/jogamp/opencl/util/concurrent/CLQueueContext.java +++ b/src/com/jogamp/opencl/util/concurrent/CLQueueContext.java @@ -11,6 +11,13 @@ import com.jogamp.opencl.CLResource; import java.util.Map; /** + * Superclass for all per-queue contexts as used in {@link CLCommandQueuePool}s. + * A context will usually hold queue (and therefore often device) specific resources used + * in tasks of the same queue. + * <p> + * Possible candidates for those resources can be compiled CLPrograms, CLKernels + * or even pre allocated CLBuffers. + * </p> * @author Michael Bien */ public abstract class CLQueueContext implements CLResource { @@ -29,6 +36,10 @@ public abstract class CLQueueContext implements CLResource { return queue.getContext(); } + /** + * A simple queue context holding a precompiled program and its kernels. + * @author Michael Bien + */ public static class CLSimpleQueueContext extends CLQueueContext { public final CLProgram program; diff --git a/src/com/jogamp/opencl/util/concurrent/CLQueueContextFactory.java b/src/com/jogamp/opencl/util/concurrent/CLQueueContextFactory.java index 64fdfbcd..58f389bf 100644 --- a/src/com/jogamp/opencl/util/concurrent/CLQueueContextFactory.java +++ b/src/com/jogamp/opencl/util/concurrent/CLQueueContextFactory.java @@ -5,9 +5,10 @@ package com.jogamp.opencl.util.concurrent; import com.jogamp.opencl.CLCommandQueue; import com.jogamp.opencl.CLProgram; +import com.jogamp.opencl.util.concurrent.CLQueueContext.CLSimpleQueueContext; /** - * + * Creates {@link CLQueueContext}s. * @author Michael Bien */ public abstract class CLQueueContextFactory<C extends CLQueueContext> { @@ -27,7 +28,11 @@ public abstract class CLQueueContextFactory<C extends CLQueueContext> { return new CLSimpleContextFactory(source); } - public static class CLSimpleContextFactory extends CLQueueContextFactory<CLQueueContext.CLSimpleQueueContext> { + /** + * Creates {@link CLSimpleQueueContext}s containing a precompiled program. + * @author Michael Bien + */ + public static class CLSimpleContextFactory extends CLQueueContextFactory<CLSimpleQueueContext> { private final String source; @@ -36,9 +41,9 @@ public abstract class CLQueueContextFactory<C extends CLQueueContext> { } @Override - public CLQueueContext.CLSimpleQueueContext setup(CLCommandQueue queue, CLQueueContext old) { + public CLSimpleQueueContext setup(CLCommandQueue queue, CLQueueContext old) { CLProgram program = queue.getContext().createProgram(source).build(queue.getDevice()); - return new CLQueueContext.CLSimpleQueueContext(queue, program); + return new CLSimpleQueueContext(queue, program); } } diff --git a/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java b/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java index e5bcb1c5..81d34907 100644 --- a/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java +++ b/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java @@ -62,13 +62,13 @@ public class CLMultiContextTest { } private final static String programSource = - "kernel void increment(global int* array, int numElements) { \n" - + " int index = get_global_id(0); \n" - + " if (index >= numElements) { \n" - + " return; \n" - + " } \n" - + " array[index]++; \n" - + "} \n"; + "kernel void compute(global int* array, int numElements) { \n" + + " int index = get_global_id(0); \n" + + " if (index >= numElements) { \n" + + " return; \n" + + " } \n" + + " array[index]++; \n" + + "} \n"; private final class CLTestTask implements CLTask<CLSimpleQueueContext, Buffer> { @@ -82,7 +82,7 @@ public class CLMultiContextTest { CLCommandQueue queue = qc.getQueue(); CLContext context = qc.getCLContext(); - CLKernel kernel = qc.getKernel("increment"); + CLKernel kernel = qc.getKernel("compute"); CLBuffer<Buffer> buffer = null; try{ @@ -133,26 +133,28 @@ public class CLMultiContextTest { out.println("invoking "+tasks.size()+" tasks on "+pool.getSize()+" queues"); + // blocking invoke pool.invokeAll(tasks); checkBuffer(1, data); - + // submit blocking emediatly for (CLTestTask task : tasks) { pool.submit(task).get(); } checkBuffer(2, data); - - List<Future<Buffer>> futures = new ArrayList<Future<Buffer>>(taskCount); - for (CLTestTask task : tasks) { - futures.add(pool.submit(task)); - } + // submitAll using futures + List<Future<Buffer>> futures = pool.submitAll(tasks); for (Future<Buffer> future : futures) { future.get(); } checkBuffer(3, data); - -// pool.switchContext(factory); + + // switching contexts using different program + factory = CLQueueContextFactory.createSimple(programSource.replaceAll("\\+\\+", "--")); + pool.switchContext(factory); + pool.invokeAll(tasks); + checkBuffer(2, data); pool.release(); }finally{ -- cgit v1.2.3