Discussion:
developers-list Digest, Vol 3, Issue 4
developers-list-request-d/
2012-11-08 11:00:01 UTC
Permalink
This message was forwarded from developers-list-d/***@public.gmane.org The MonetDB
mailing lists have moved to monetdb.org. Please subscribe to
developers-list-d/***@public.gmane.org, and unsubscribe from this list.
See: http://mail.monetdb.org/mailman/listinfo/developers-list

Send developers-list mailing list submissions to
developers-list-d/***@public.gmane.org

To subscribe or unsubscribe via the World Wide Web, visit
http://mail.monetdb.org/mailman/listinfo/developers-list
or, via email, send a message with subject or body 'help' to
developers-list-request-d/***@public.gmane.org

You can reach the person managing the list at
developers-list-owner-d/***@public.gmane.org

When replying, please edit your Subject line so it is more specific
than "Re: Contents of developers-list digest..."


Today's Topics:

1. Re: MonetDB: default - Low-level task scheduler.
(Sjoerd Mullender)


----------------------------------------------------------------------

Message: 1
Date: Wed, 07 Nov 2012 17:17:55 +0100
From: Sjoerd Mullender <sjoerd-d/***@public.gmane.org>
To: developers-list-d/***@public.gmane.org
Subject: Re: MonetDB: default - Low-level task scheduler.
Message-ID: <509A89B3.6040808-d/***@public.gmane.org>
Content-Type: text/plain; charset=UTF-8

There are a few things wrong with this code:

- sz?=?((sz?<<?1)?>>?1); does *not* turn sz into a multiple of two (as
suggested by the comment). This statement basically is a no-op.
- when you create joinable threads, you should join them.
- it's not a great idea to use assert to make sure that GDKmalloc
succeeds. Better is to return an error.
- The include of monet_options.h should be in the C file, not in the
include file (which would be included elsewhere where monet_options.h
will already be included).
Changeset: 5ff3c16e865f for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=5ff3c16e865f
gdk/gdk_mapreduce.c
gdk/gdk_mapreduce.h
gdk/Makefile.ag
monetdb5/modules/mal/groups.c
Branch: default
Low-level task scheduler.
This module provide a lightweight map-reduce scheduler for multicore systems.
A limited number of workers are initialized upfront, which take the tasks
from a central queue. The header of these task descriptors should comply
with the MRtask structure.
diff --git a/gdk/Makefile.ag b/gdk/Makefile.ag
--- a/gdk/Makefile.ag
+++ b/gdk/Makefile.ag
@@ -36,7 +36,7 @@ lib_gdk = {
gdk_private.h gdk_delta.h gdk_logger.h gdk_posix.h \
gdk_system.h gdk_tm.h gdk_storage.h \
gdk_calc.c gdk_calc.h gdk_calc_compare.h gdk_calc_private.h \
- gdk_aggr.c gdk_group.c \
+ gdk_aggr.c gdk_group.c gdk_mapreduce.c gdk_mapreduce.h \
bat.feps bat1.feps bat2.feps \
libbat.rc
LIBS = ../common/options/libmoptions \
diff --git a/gdk/gdk_mapreduce.c b/gdk/gdk_mapreduce.c
new file mode 100644
--- /dev/null
+++ b/gdk/gdk_mapreduce.c
@@ -0,0 +1,141 @@
+/*
+ * The contents of this file are subject to the MonetDB Public License
+ * Version 1.1 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.monetdb.org/Legal/MonetDBLicense
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+ * License for the specific language governing rights and limitations
+ * under the License.
+ *
+ * The Original Code is the MonetDB Database System.
+ *
+ * The Initial Developer of the Original Code is CWI.
+ * Portions created by CWI are Copyright (C) 1997-July 2008 CWI.
+ * Copyright August 2008-2012 MonetDB B.V.
+ * All Rights Reserved.
+ */
+
+/*
+ * (co) Martin L. Kersten
+ * This module provide a lightweight map-reduce scheduler for multicore systems.
+ * A limited number of workers are initialized upfront, which take the tasks
+ * from a central queue. The header of these task descriptors should comply
+ * with the MRtask structure.
+ *
+ */
+#include "monetdb_config.h"
+#include "gdk.h"
+#include "gdk_mapreduce.h"
+
+/* each entry in the queue contains a list of tasks */
+typedef struct MRQUEUE {
+ MRtask **tasks;
+ int index; /* next available task */
+ int size; /* number of tasks */
+} MRqueue;
+
+static MRqueue *mrqueue;
+static int mrqsize= -1; /* size of queue */
+static int mrqlast= -1;
+static MT_Lock mrqlock; /* its a shared resource, ie we need locks */
+static MT_Sema mrqsema; /* threads wait on empty queues */
+
+
+static void MRworker(void *);
+
+static void
+MRqueueCreate(int sz)
+{
+ int i;
+ MT_Id tid;
+
+ MT_lock_init(&mrqlock, "q_create");
+ MT_lock_set(&mrqlock,"q_create");
+ MT_sema_init(&mrqsema, 0, "q_create");
+ sz = ((sz << 1) >> 1); /* we want a multiple of 2 */
+ mrqueue = (MRqueue*)GDKzalloc(sizeof(MRqueue) *sz);
+ assert(mrqueue);
+ mrqsize = sz;
+ mrqlast = 0;
+ /* create a worker thread for each core as specified as system parameter*/
+ for ( i =0; i < GDKnr_threads; i++)
+ MT_create_thread(&tid, MRworker, (void *) 0, MT_THR_JOINABLE);
+ MT_lock_unset(&mrqlock,"q_create");
+}
+
+static void
+MRenqueue(int taskcnt, MRtask **tasks)
+{
+ assert(taskcnt > 0);
+ MT_lock_set(&mrqlock, "mrqlock");
+ if (mrqlast == mrqsize) {
+ mrqsize <<= 1;
+ mrqueue = (MRqueue*) GDKrealloc(mrqueue, sizeof(MRqueue) * mrqsize);
+ }
+ mrqueue[mrqlast].index = 0;
+ mrqueue[mrqlast].tasks = tasks;
+ mrqueue[mrqlast].size = taskcnt;
+ mrqlast++;
+ MT_lock_unset(&mrqlock, "mrqlock");
+ /* a task list is added for consumption*/
+ while (taskcnt-- > 0)
+ MT_sema_up(&mrqsema, "mrqsema");
+}
+
+static MRtask *
+MRdequeue(void)
+{
+ MRtask *r = NULL;
+ int idx;
+
+ MT_sema_down(&mrqsema, "mrqsema");
+ assert(mrqlast);
+ MT_lock_set(&mrqlock, "mrqlock");
+ if (mrqlast > 0) {
+ idx = mrqueue[mrqlast-1].index;
+ r = mrqueue[mrqlast-1].tasks[idx++];
+ if ( mrqueue[mrqlast-1].size == idx)
+ mrqlast--;
+ else
+ mrqueue[mrqlast-1].index = idx;
+ }
+ MT_lock_unset(&mrqlock, "mrqlock");
+ assert(r);
+ return r;
+}
+
+static void
+MRworker(void * arg)
+{
+ MRtask *task;
+ (void) arg;
+ do{
+ task= MRdequeue();
+ (task->cmd)(task);
+ MT_sema_up(task->sema, "mrqsema");
+ } while (1);
+}
+
+/* schedule the tasks and return when all are done */
+void
+MRschedule(int taskcnt, void **arg, void (*cmd)(void*p))
+{
+ int i;
+ MT_Sema sema;
+ MRtask **task = (MRtask**) arg;
+
+ if ( mrqueue == 0)
+ MRqueueCreate(1024);
+
+ MT_sema_init(&sema, 0, "q_create");
+ for ( i= 0; i < taskcnt; i++){
+ task[i]->sema = & sema;
+ task[i]->cmd = cmd;
+ }
+ MRenqueue(taskcnt,task);
+ /* waiting for all report result */
+ for ( i= 0; i < taskcnt; i++)
+ MT_sema_down(&sema, "mrqsema");
+}
diff --git a/gdk/gdk_mapreduce.h b/gdk/gdk_mapreduce.h
new file mode 100644
--- /dev/null
+++ b/gdk/gdk_mapreduce.h
@@ -0,0 +1,32 @@
+/*
+ * The contents of this file are subject to the MonetDB Public License
+ * Version 1.1 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.monetdb.org/Legal/MonetDBLicense
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+ * License for the specific language governing rights and limitations
+ * under the License.
+ *
+ * The Original Code is the MonetDB Database System.
+ *
+ * The Initial Developer of the Original Code is CWI.
+ * Portions created by CWI are Copyright (C) 1997-July 2008 CWI.
+ * Copyright August 2008-2012 MonetDB B.V.
+ * All Rights Reserved.
+ */
+
+#ifndef _GDK_MAPREDUCE_H_
+#define _GDK_MAPREDUCE_H_
+
+#include <monet_options.h>
+
+typedef struct{
+ MT_Sema *sema; /* micro scheduler handle */
+ void (*cmd)(void *); /* the function to be executed */
+}MRtask;
+
+gdk_export void MRschedule(int taskcnt, void **arg, void (*cmd)(void *p));
+
+#endif /* _GDK_MAPREDUCE_H_ */
diff --git a/monetdb5/modules/mal/groups.c b/monetdb5/modules/mal/groups.c
--- a/monetdb5/modules/mal/groups.c
+++ b/monetdb5/modules/mal/groups.c
@@ -66,11 +66,15 @@ GRPmulticolumngroup(Client cntxt, MalBlk
/* sort order may have influences */
/* SF100 Q16 showed < ordering is 2 times faster as > ordering */
for ( i = 3; i< pci->argc; i++)
- for ( j = i+1; j<pci->argc; j++)
- if ( sizes[j] < sizes[i]){
- l = sizes[j]; sizes[j]= sizes[i]; sizes[i]= l;
- bi = bid[j]; bid[j]= bid[i]; bid[i]= bi;
- }
+ for ( j = i+1; j<pci->argc; j++)
+ if ( sizes[j] < sizes[i]){
+ l = sizes[j];
+ sizes[j]= sizes[i];
+ sizes[i]= l;
+ bi = bid[j];
+ bid[j]= bid[i];
+ bid[i]= bi;
+ }
/* for (i=2; i<pci->argc; i++)
mnstr_printf(cntxt->fdout,"# after [%d] "LLFMT"\n",i, sizes[i]); */
@@ -82,8 +86,6 @@ GRPmulticolumngroup(Client cntxt, MalBlk
i = 4;
if (msg == MAL_SUCCEED && pci->argc > 4 )
do {
- if (*ext)
- BBPdecref(*ext, TRUE);
/* early break when there are as many groups as histogram entries */
b = BATdescriptor(*hist);
if ( b ){
@@ -91,8 +93,8 @@ GRPmulticolumngroup(Client cntxt, MalBlk
BBPreleaseref(*hist);
if ( j) break;
}
- if (*hist)
- BBPdecref(*hist, TRUE);
+ BBPdecref(*ext, TRUE);
+ BBPdecref(*hist, TRUE);
/* (grp,ext,hist) := group.subgroupdone(arg,grp) */
oldgrp= *grp;
_______________________________________________
checkin-list mailing list
http://mail.monetdb.org/mailman/listinfo/checkin-list
--
Sjoerd Mullender


------------------------------

_______________________________________________
developers-list mailing list
developers-list-d/***@public.gmane.org
http://mail.monetdb.org/mailman/listinfo/developers-list


End of developers-list Digest, Vol 3, Issue 4
*********************************************
Loading...