From 0eac8180954fdb43a4d1f724d018e4a329390d1c Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Daniel=20M=C3=BCller?= <d_muel20@uni-muenster.de>
Date: Sun, 8 Sep 2019 21:53:04 +0200
Subject: [PATCH] * Code streamlining & cleanup

---
 Grinder/Grinder.pro                           |  10 +-
 Grinder/Version.h                             |   4 +-
 .../ml/barista/tasks/BaristaInferenceTask.cpp |   4 +
 .../ml/barista/tasks/BaristaInferenceTask.h   |   3 +
 Grinder/ml/blocks/InferenceBlock.cpp          |  21 ---
 Grinder/ml/blocks/InferenceBlock.h            |   6 +-
 .../ml/blocks/InferenceBlockResultsCache.cpp  |  28 ++++
 .../ml/blocks/InferenceBlockResultsCache.h    |  32 +++++
 Grinder/ml/processors/InferenceProcessor.cpp  |   7 +-
 .../ml/processors/MachineLearningProcessor.h  |  30 +----
 .../MachineLearningProcessor.impl.h           | 127 +++---------------
 Grinder/ml/processors/TrainingProcessor.cpp   |  12 +-
 Grinder/ml/processors/TrainingProcessor.h     |   3 +-
 Grinder/ml/tasks/MachineLearningTask.cpp      |  16 +++
 Grinder/ml/tasks/MachineLearningTask.h        |  10 ++
 Grinder/pipeline/Block.h                      |   7 +
 Grinder/pipeline/BlockDataCache.cpp           |  42 ++++++
 Grinder/pipeline/BlockDataCache.h             |  39 ++++++
 .../processors/PipelineTaskProcessor.h        |  56 ++++++++
 .../processors/PipelineTaskProcessor.impl.h   | 104 ++++++++++++++
 20 files changed, 389 insertions(+), 172 deletions(-)
 create mode 100644 Grinder/ml/blocks/InferenceBlockResultsCache.cpp
 create mode 100644 Grinder/ml/blocks/InferenceBlockResultsCache.h
 create mode 100644 Grinder/pipeline/BlockDataCache.cpp
 create mode 100644 Grinder/pipeline/BlockDataCache.h
 create mode 100644 Grinder/pipeline/processors/PipelineTaskProcessor.h
 create mode 100644 Grinder/pipeline/processors/PipelineTaskProcessor.impl.h

diff --git a/Grinder/Grinder.pro b/Grinder/Grinder.pro
index 3f94879..450bd0c 100644
--- a/Grinder/Grinder.pro
+++ b/Grinder/Grinder.pro
@@ -459,7 +459,9 @@ SOURCES += \
     engine/processors/ImageTagsProcessor.cpp \
     ml/blocks/InferenceBlock.cpp \
     ml/processors/InferenceProcessor.cpp \
-    util/QtUtils.cpp
+    util/QtUtils.cpp \
+    pipeline/BlockDataCache.cpp \
+    ml/blocks/InferenceBlockResultsCache.cpp
 
 HEADERS += \
 	ui/mainwnd/GrinderWindow.h \
@@ -993,7 +995,11 @@ HEADERS += \
     engine/processors/ImageTagsProcessor.h \
     ml/blocks/InferenceBlock.h \
     ml/processors/InferenceProcessor.h \
-    util/QtUtils.h
+    util/QtUtils.h \
+    pipeline/BlockDataCache.h \
+    ml/blocks/InferenceBlockResultsCache.h \
+    pipeline/processors/PipelineTaskProcessor.h \
+    pipeline/processors/PipelineTaskProcessor.impl.h
 
 FORMS += \
 	ui/mainwnd/GrinderWindow.ui \
diff --git a/Grinder/Version.h b/Grinder/Version.h
index 19cb0a3..2937c61 100644
--- a/Grinder/Version.h
+++ b/Grinder/Version.h
@@ -10,14 +10,14 @@
 
 #define GRNDR_INFO_TITLE		"Grinder"
 #define GRNDR_INFO_COPYRIGHT	"Copyright (c) WWU Muenster"
-#define GRNDR_INFO_DATE			"06.9.2019"
+#define GRNDR_INFO_DATE			"08.9.2019"
 #define GRNDR_INFO_COMPANY		"WWU Muenster"
 #define GRNDR_INFO_WEBSITE		"http://www.uni-muenster.de"
 
 #define GRNDR_VERSION_MAJOR		0
 #define GRNDR_VERSION_MINOR		15
 #define GRNDR_VERSION_REVISION	0
-#define GRNDR_VERSION_BUILD		390
+#define GRNDR_VERSION_BUILD		391
 
 namespace grndr
 {
diff --git a/Grinder/ml/barista/tasks/BaristaInferenceTask.cpp b/Grinder/ml/barista/tasks/BaristaInferenceTask.cpp
index 637da56..760073e 100644
--- a/Grinder/ml/barista/tasks/BaristaInferenceTask.cpp
+++ b/Grinder/ml/barista/tasks/BaristaInferenceTask.cpp
@@ -121,6 +121,7 @@ std::unique_ptr<BaristaMessage> BaristaInferenceTask::handleLoadNetworkMessage(B
 			addLogMessage("", false);
 
 			// The task has been fully initialized
+			_currentImageIndex = 0;
 			emit initializationFinished();
 
 			changeTaskState(InferenceTaskState::InferImages, "Starting inference...");
@@ -162,6 +163,9 @@ std::unique_ptr<BaristaMessage> BaristaInferenceTask::handleInferImageMessage(Ba
 			// If no proper results were emitted, emit empty ones so waiting processors will not stall
 			if (!resultsEmitted)
 				emit inferImageFinished({});
+
+			// Update the task's progress when an image has been infered
+			setProgress(static_cast<float>(++_currentImageIndex) / static_cast<float>(_imageReferences.size()));
 		}
 		else
 			reportError("\t\tFailed", message);
diff --git a/Grinder/ml/barista/tasks/BaristaInferenceTask.h b/Grinder/ml/barista/tasks/BaristaInferenceTask.h
index 0df63a1..f50a195 100644
--- a/Grinder/ml/barista/tasks/BaristaInferenceTask.h
+++ b/Grinder/ml/barista/tasks/BaristaInferenceTask.h
@@ -77,6 +77,9 @@ namespace grndr
 	private:
 		std::vector<BaristaLoadNetworkMessage::DataInformation> _networkInputs;
 		std::vector<BaristaLoadNetworkMessage::DataInformation> _networkOutputs;
+
+	private:
+		unsigned int _currentImageIndex{0};
 	};
 }
 
diff --git a/Grinder/ml/blocks/InferenceBlock.cpp b/Grinder/ml/blocks/InferenceBlock.cpp
index 530d2e1..09c12a8 100644
--- a/Grinder/ml/blocks/InferenceBlock.cpp
+++ b/Grinder/ml/blocks/InferenceBlock.cpp
@@ -32,27 +32,6 @@ std::unique_ptr<ProcessorBase> InferenceBlock::createProcessor() const
 	return std::make_unique<InferenceProcessor>(this);
 }
 
-cv::Mat InferenceBlock::cachedResult(const ImageReference* imageRef, const ImageTag* imageTag) const
-{
-	auto it = _cachedResults.find(imageRef);
-
-	if (it != _cachedResults.cend())
-	{
-		if (it->second.find(imageTag) != it->second.cend())
-			return it->second[imageTag].object();
-	}
-
-	return {};
-}
-
-void InferenceBlock::cacheResults(const ImageReference* imageRef, const MachineLearningTask::ImageInferenceResults& results) const
-{
-	_cachedResults[imageRef].clear();
-
-	for (auto it : results)
-		_cachedResults[imageRef][it.first] = it.second;
-}
-
 void InferenceBlock::createPorts()
 {
 	MachineLearningBlock::createPorts();
diff --git a/Grinder/ml/blocks/InferenceBlock.h b/Grinder/ml/blocks/InferenceBlock.h
index 4ff615d..70bab52 100644
--- a/Grinder/ml/blocks/InferenceBlock.h
+++ b/Grinder/ml/blocks/InferenceBlock.h
@@ -7,6 +7,7 @@
 #define INFERENCEBLOCK_H
 
 #include "MachineLearningBlock.h"
+#include "InferenceBlockResultsCache.h"
 #include "ml/tasks/MachineLearningTask.h"
 
 namespace grndr
@@ -38,8 +39,7 @@ namespace grndr
 		const ImageTagPortMap& imageTagPorts() const { return _dynamicOutPorts; }
 
 	public:
-		cv::Mat cachedResult(const ImageReference* imageRef, const ImageTag* imageTag) const;
-		void cacheResults(const ImageReference* imageRef, const MachineLearningTask::ImageInferenceResults& results) const;
+		InferenceBlockResultsCache& resultsCache() const { return _resultsCache; }
 
 	protected:
 		virtual void createPorts() override;
@@ -59,7 +59,7 @@ namespace grndr
 		const ImageTags* _inputImageTags{nullptr};
 
 	private:
-		mutable std::map<const ImageReference*, std::map<const ImageTag*, CacheObject<cv::Mat>>> _cachedResults;
+		mutable InferenceBlockResultsCache _resultsCache;
 	};
 }
 
diff --git a/Grinder/ml/blocks/InferenceBlockResultsCache.cpp b/Grinder/ml/blocks/InferenceBlockResultsCache.cpp
new file mode 100644
index 0000000..9f69873
--- /dev/null
+++ b/Grinder/ml/blocks/InferenceBlockResultsCache.cpp
@@ -0,0 +1,28 @@
+/******************************************************************************
+ * File: InferenceBlockResultsCache.cpp
+ * Date: 07.9.2019
+ *****************************************************************************/
+
+#include "Grinder.h"
+#include "InferenceBlockResultsCache.h"
+
+void InferenceBlockResultsCache::cacheResults(const ImageReference* imageRef, const MachineLearningTask::ImageInferenceResults& results)
+{
+	_cachedResults[imageRef].clear();
+
+	for (auto it : results)
+		_cachedResults[imageRef][it.first] = it.second;
+}
+
+cv::Mat InferenceBlockResultsCache::cachedResult(const ImageReference* imageRef, const ImageTag* imageTag) const
+{
+	auto it = _cachedResults.find(imageRef);
+
+	if (it != _cachedResults.cend())
+	{
+		if (it->second.find(imageTag) != it->second.cend())
+			return it->second.at(imageTag).object();
+	}
+
+	return {};
+}
diff --git a/Grinder/ml/blocks/InferenceBlockResultsCache.h b/Grinder/ml/blocks/InferenceBlockResultsCache.h
new file mode 100644
index 0000000..27dbac3
--- /dev/null
+++ b/Grinder/ml/blocks/InferenceBlockResultsCache.h
@@ -0,0 +1,32 @@
+/******************************************************************************
+ * File: InferenceBlockResultsCache.h
+ * Date: 07.9.2019
+ *****************************************************************************/
+
+#ifndef INFERENCEBLOCKRESULTSCACHE_H
+#define INFERENCEBLOCKRESULTSCACHE_H
+
+#include <map>
+#include <opencv2/core.hpp>
+
+#include "core/data/CacheObject.h"
+#include "ml/tasks/MachineLearningTask.h"
+
+namespace grndr
+{
+	class ImageReference;
+	class ImageTag;
+
+	class InferenceBlockResultsCache
+	{
+	public:
+		void cacheResults(const ImageReference* imageRef, const MachineLearningTask::ImageInferenceResults& results);
+
+		cv::Mat cachedResult(const ImageReference* imageRef, const ImageTag* imageTag) const;
+
+	private:
+		std::map<const ImageReference*, std::map<const ImageTag*, CacheObject<cv::Mat>>> _cachedResults;
+	};
+}
+
+#endif
diff --git a/Grinder/ml/processors/InferenceProcessor.cpp b/Grinder/ml/processors/InferenceProcessor.cpp
index 5720711..9c8f140 100644
--- a/Grinder/ml/processors/InferenceProcessor.cpp
+++ b/Grinder/ml/processors/InferenceProcessor.cpp
@@ -68,9 +68,6 @@ void InferenceProcessor::executionPass(EngineExecutionContext& ctx, const Machin
 
 			// Wait for the inference results
 			taskLoop(ctx, &opExecTask, &resultsArrived);
-
-			// The task doesn't know the total image count, so update its progress here
-			_spawnedTask->setProgress(ctx.getActiveImageIndex() / ctx.imageReferences().size());
 		} catch (TaskException& e) {
 			// Forward task exceptions as processor exceptions
 			throwProcessorException(GetExceptionMessage(e.what()));
@@ -86,7 +83,7 @@ void InferenceProcessor::forwardInferenceResults(EngineExecutionContext& ctx, co
 
 	// Cache the results if they aren't empty
 	if (!results.empty())
-		_block->cacheResults(ctx.activeImageReference(), results);
+		_block->resultsCache().cacheResults(ctx.activeImageReference(), results);
 
 	for (auto it : _block->imageTagPorts())
 	{
@@ -102,7 +99,7 @@ void InferenceProcessor::forwardInferenceResults(EngineExecutionContext& ctx, co
 		else
 		{
 			if (useCache)	// Use a previously cached result if available
-				imgData = _block->cachedResult(ctx.activeImageReference(), it.first);
+				imgData = _block->resultsCache().cachedResult(ctx.activeImageReference(), it.first);
 		}
 
 		if (imgData.empty())	// If no result exists, forward an empty one
diff --git a/Grinder/ml/processors/MachineLearningProcessor.h b/Grinder/ml/processors/MachineLearningProcessor.h
index f32069c..20b6875 100644
--- a/Grinder/ml/processors/MachineLearningProcessor.h
+++ b/Grinder/ml/processors/MachineLearningProcessor.h
@@ -6,23 +6,21 @@
 #ifndef MACHINELEARNINGPROCESSOR_H
 #define MACHINELEARNINGPROCESSOR_H
 
-#include "engine/Processor.h"
+#include "pipeline/processors/PipelineTaskProcessor.h"
 #include "ml/tasks/MachineLearningTask.h"
 
 namespace grndr
 {
 	class MachineLearningBlock;
-	class MachineLearningMethodBase;
 
 	template<typename BlockType>
-	class MachineLearningProcessor : public Processor<BlockType>
+	class MachineLearningProcessor : public PipelineTaskProcessor<BlockType, MachineLearningTask, MachineLearningTaskData>
 	{
 		static_assert(std::is_base_of<MachineLearningBlock, BlockType>::value, "BlockType must be derived from MachineLearningBlock");
 
 	protected:
 		static const char* Data_Value_Method;
 		static const char* Data_Value_State;
-		static const char* Data_Value_SpawnedTask;
 
 	protected:
 		enum class SpawnType
@@ -34,35 +32,17 @@ namespace grndr
 	public:
 		MachineLearningProcessor(const Block* block, SpawnType spawnType, bool requiresBatchMode = false);
 
-	public:
-		virtual void execute(EngineExecutionContext& ctx) override;
-
 	protected:
-		virtual void preExecution(EngineExecutionContext& ctx, const MachineLearningMethodBase* method, const MachineLearningTaskData& taskData) { Q_UNUSED(ctx); Q_UNUSED(method); Q_UNUSED(taskData); }
-		virtual void postExecution(EngineExecutionContext& ctx, const MachineLearningMethodBase* method, const MachineLearningTaskData& taskData) { Q_UNUSED(ctx); Q_UNUSED(method); Q_UNUSED(taskData); }
-		virtual void bypassExecution(EngineExecutionContext& ctx) { Q_UNUSED(ctx); }
-
-		virtual void firstExecutionPass(EngineExecutionContext& ctx, const MachineLearningTaskData& taskData) { Q_UNUSED(ctx); Q_UNUSED(taskData); }
-		virtual void executionPass(EngineExecutionContext& ctx, const MachineLearningTaskData& taskData) { Q_UNUSED(ctx); Q_UNUSED(taskData); }
-		virtual void lastExecutionPass(EngineExecutionContext& ctx, const MachineLearningTaskData& taskData) { Q_UNUSED(ctx); Q_UNUSED(taskData); }
-
-		virtual void fillTaskData(EngineExecutionContext& ctx, const MachineLearningMethodBase* method, MachineLearningTaskData& taskData) const;
+		virtual void fillTaskData(EngineExecutionContext& ctx, MachineLearningTaskData& taskData) const override;
 
 	protected:
-		void taskLoop(EngineExecutionContext& ctx, LongOperation* longOp, const bool* waitCondition = nullptr) const;
+		virtual void spawnTask(const MachineLearningTaskData& taskData) override;
 
 	private:
-		void spawnTask(const MachineLearningMethodBase* method, QString state);		
-
-	private:
-		QString getSpawnTypeName(SpawnType type) const;
-
-	protected:
-		std::shared_ptr<MachineLearningTask> _spawnedTask;
+		QString getSpawnTypeName() const;
 
 	private:
 		SpawnType _spawnType{SpawnType::Training};
-		bool _requiresBatchMode{false};
 	};
 }
 
diff --git a/Grinder/ml/processors/MachineLearningProcessor.impl.h b/Grinder/ml/processors/MachineLearningProcessor.impl.h
index 21c489c..9cf296e 100644
--- a/Grinder/ml/processors/MachineLearningProcessor.impl.h
+++ b/Grinder/ml/processors/MachineLearningProcessor.impl.h
@@ -16,158 +16,63 @@ template<typename BlockType>
 const char* MachineLearningProcessor<BlockType>::Data_Value_Method = "Method";
 template<typename BlockType>
 const char* MachineLearningProcessor<BlockType>::Data_Value_State = "State";
-template<typename BlockType>
-const char* MachineLearningProcessor<BlockType>::Data_Value_SpawnedTask = "SpawnedTask";
 
 Q_DECLARE_METATYPE(const MachineLearningMethodBase*)
 Q_DECLARE_METATYPE(const ImageTags*)
 Q_DECLARE_METATYPE(std::shared_ptr<MachineLearningTask>)
 
 template<typename BlockType>
-MachineLearningProcessor<BlockType>::MachineLearningProcessor(const Block* block, MachineLearningProcessor::SpawnType spawnType, bool requiresBatchMode) : Processor<BlockType>(block),
-	_spawnType{spawnType}, _requiresBatchMode{requiresBatchMode}
+MachineLearningProcessor<BlockType>::MachineLearningProcessor(const Block* block, MachineLearningProcessor::SpawnType spawnType, bool requiresBatchMode) : PipelineTaskProcessor<BlockType, MachineLearningTask, MachineLearningTaskData>(block, requiresBatchMode),
+	_spawnType{spawnType}
 {
 
 }
 
 template<typename BlockType>
-void MachineLearningProcessor<BlockType>::execute(EngineExecutionContext& ctx)
-{
-	if (!this->isBlockBypassed())
-	{
-		// Get the used machine learning method and model state
-		const MachineLearningMethodBase* method = this->template portData<const MachineLearningMethodBase*>(ctx, this->_block->methodPort(), Data_Value_Method);
-		QString state = this->template portData<QString>(ctx, this->_block->statePort(), Data_Value_State, false);
-		const ImageTags* imageTags = this->template portData<const ImageTags*>(ctx, this->_block->imageTagsPort(), ImageTagsProcessor::Data_Value_ImageTags);
-
-		// Take care of the machine learning task
-		if (!_requiresBatchMode || ctx.hasExecutionFlag(Engine::ExecutionFlag::Batch))
-		{
-			// Get the task data for machine learning
-			MachineLearningTaskData taskData;
-			taskData.state = state;
-			taskData.imageTags = imageTags;
-			fillTaskData(ctx, method, taskData);
-
-			// Spawn the task when the first image is active
-			if (ctx.isFirstImage())
-			{
-				preExecution(ctx, method, taskData);
-
-				spawnTask(method, state);
-				ctx.persistentData().set(this->_block, Data_Value_SpawnedTask, _spawnedTask);	// Store the spawned task in the persistent context data
-			}
-			else
-			{
-				_spawnedTask = ctx.persistentData().get<std::shared_ptr<MachineLearningTask>>(this->block(), Data_Value_SpawnedTask);	// Retrieve the stored spawned task from the persistent context data
-
-				if (!_spawnedTask)
-					this->throwProcessorException("No machine learning task has been spawned");
-			}
-
-			if (ctx.isFirstImage())
-			{
-				_spawnedTask->processEngineStart(ctx, taskData);
-				firstExecutionPass(ctx, taskData);
-			}
-
-			if (!ctx.wasAborted())	// Only perform passes if not aborted
-			{
-				_spawnedTask->processEnginePass(ctx, taskData);
-				executionPass(ctx, taskData);
-			}
-
-			if (ctx.isLastImage())
-			{
-				_spawnedTask->processEngineEnd(ctx, taskData);
-				lastExecutionPass(ctx, taskData);
-
-				// Forget the spawned task
-				_spawnedTask = nullptr;
-				ctx.persistentData().remove(this->_block, Data_Value_SpawnedTask);
-
-				postExecution(ctx, method, taskData);
-			}
-		}
-		else
-		{
-			QString name = getSpawnTypeName(_spawnType);
-			this->throwProcessorException(QString{"%1 is only possible in batch mode; bypass the %2 block to avoid this warning"}.arg(name).arg(name.toLower()));
-		}
-	}
-	else
-		bypassExecution(ctx);
-}
-
-template<typename BlockType>
-void MachineLearningProcessor<BlockType>::fillTaskData(EngineExecutionContext& ctx, const MachineLearningMethodBase* method, MachineLearningTaskData& taskData) const
+void MachineLearningProcessor<BlockType>::fillTaskData(EngineExecutionContext& ctx, MachineLearningTaskData& taskData) const
 {
-	Q_UNUSED(method);
+	taskData.method = this->template portData<const MachineLearningMethodBase*>(ctx, this->_block->methodPort(), Data_Value_Method);
+	taskData.imageTags = this->template portData<const ImageTags*>(ctx, this->_block->imageTagsPort(), ImageTagsProcessor::Data_Value_ImageTags);
 
 	if (auto dataBlob = this->portData(ctx, this->_block->inPort()))
 		taskData.imageData = dataBlob->getMatrix();
-}
 
-template<typename BlockType>
-void MachineLearningProcessor<BlockType>::taskLoop(EngineExecutionContext& ctx, LongOperation* longOp, const bool* waitCondition) const
-{
-	// Wait for the task to finish
-	while (_spawnedTask->isRunning())
-	{
-		// Process application events and sleep a bit
-		QApplication::processEvents();
-		QThread::msleep(25);
-
-		if (waitCondition)
-		{
-			if (*waitCondition)
-				break;
-		}
-
-		if (longOp->wasCanceled())
-			ctx.abortProcessing();
-
-		if (ctx.wasAborted())
-		{
-			grinder()->taskController().stopTask(_spawnedTask.get());
-			break;
-		}
-	}
+	taskData.state = this->template portData<QString>(ctx, this->_block->statePort(), Data_Value_State, false);
 }
 
 template<typename BlockType>
-void MachineLearningProcessor<BlockType>::spawnTask(const MachineLearningMethodBase* method, QString state)
+void MachineLearningProcessor<BlockType>::spawnTask(const MachineLearningTaskData& taskData)
 {
-	_spawnedTask = nullptr;
+	this->_spawnedTask = nullptr;
 
-	if (auto spawner = method->createTaskSpawner())
+	if (auto spawner = taskData.method->createTaskSpawner())
 	{
-		QString taskName = QString{"%1 %2"}.arg(method->getMethodName()).arg(getSpawnTypeName(_spawnType));
+		QString taskName = QString{"%1 %2"}.arg(taskData.method->getMethodName()).arg(getSpawnTypeName());
 
 		switch (_spawnType)
 		{
 		case SpawnType::Training:
-			_spawnedTask = spawner->spawnTrainingTask(state, taskName);
+			this->_spawnedTask = spawner->spawnTrainingTask(taskData.state, taskName);
 			break;
 
 		case SpawnType::Inference:
-			_spawnedTask = spawner->spawnInferenceTask(state, taskName);
+			this->_spawnedTask = spawner->spawnInferenceTask(taskData.state, taskName);
 			break;
 		}
 
-		if (!_spawnedTask)
+		if (!this->_spawnedTask)
 			this->throwProcessorException("Unable to spawn the machine learning task");
 
-		_spawnedTask->setInfo(QString{"Machine learning task<br><em>%1</em>"}.arg(this->_block->getFormattedName()));
+		this->_spawnedTask->setInfo(QString{"Machine learning task<br><em>%1</em>"}.arg(this->_block->getFormattedName()));
 	}
 	else
 		this->throwProcessorException("Unable to create a task spawner");
 }
 
 template<typename BlockType>
-QString MachineLearningProcessor<BlockType>::getSpawnTypeName(SpawnType type) const
+QString MachineLearningProcessor<BlockType>::getSpawnTypeName() const
 {
-	switch (type)
+	switch (_spawnType)
 	{
 	case SpawnType::Training:
 		return "Training";
diff --git a/Grinder/ml/processors/TrainingProcessor.cpp b/Grinder/ml/processors/TrainingProcessor.cpp
index 6712356..85a9ac6 100644
--- a/Grinder/ml/processors/TrainingProcessor.cpp
+++ b/Grinder/ml/processors/TrainingProcessor.cpp
@@ -13,15 +13,23 @@ TrainingProcessor::TrainingProcessor(const Block* block) : MachineLearningProces
 
 }
 
+void TrainingProcessor::executionPass(EngineExecutionContext& ctx, const MachineLearningTaskData& taskData)
+{
+	MachineLearningProcessor::executionPass(ctx, taskData);
+
+	if (auto dataBlob = portData(ctx, _block->inPort()))
+		_spawnedTask->trainImage(dataBlob->getMatrix(), ctx.activeImageReference());
+}
+
 void TrainingProcessor::lastExecutionPass(EngineExecutionContext& ctx, const MachineLearningTaskData& taskData)
 {
 	// The last image has been reached, so execute the task synchronously
 	executeTask(ctx, taskData);
 }
 
-void TrainingProcessor::fillTaskData(EngineExecutionContext& ctx, const MachineLearningMethodBase* method, MachineLearningTaskData& taskData) const
+void TrainingProcessor::fillTaskData(EngineExecutionContext& ctx, MachineLearningTaskData& taskData) const
 {
-	MachineLearningProcessor::fillTaskData(ctx, method, taskData);
+	MachineLearningProcessor::fillTaskData(ctx, taskData);
 
 	if (auto dataBlob = portData(ctx, _block->tagsBitmapPort()))
 		taskData.imageTagsData = dataBlob->getMatrix();		
diff --git a/Grinder/ml/processors/TrainingProcessor.h b/Grinder/ml/processors/TrainingProcessor.h
index 9ab2725..1154fb4 100644
--- a/Grinder/ml/processors/TrainingProcessor.h
+++ b/Grinder/ml/processors/TrainingProcessor.h
@@ -17,9 +17,10 @@ namespace grndr
 		TrainingProcessor(const Block* block);
 
 	protected:
+		virtual void executionPass(EngineExecutionContext& ctx, const MachineLearningTaskData& taskData) override;
 		virtual void lastExecutionPass(EngineExecutionContext& ctx, const MachineLearningTaskData& taskData) override;
 
-		virtual void fillTaskData(EngineExecutionContext& ctx, const MachineLearningMethodBase* method, MachineLearningTaskData& taskData) const override;
+		virtual void fillTaskData(EngineExecutionContext& ctx, MachineLearningTaskData& taskData) const override;
 
 	private:
 		void executeTask(EngineExecutionContext& ctx, const MachineLearningTaskData& taskData) const;
diff --git a/Grinder/ml/tasks/MachineLearningTask.cpp b/Grinder/ml/tasks/MachineLearningTask.cpp
index 413adc1..e8209c0 100644
--- a/Grinder/ml/tasks/MachineLearningTask.cpp
+++ b/Grinder/ml/tasks/MachineLearningTask.cpp
@@ -5,3 +5,19 @@
 
 #include "Grinder.h"
 #include "MachineLearningTask.h"
+#include "engine/EngineExecutionContext.h"
+
+void MachineLearningTask::processEngineStart(EngineExecutionContext& ctx, const MachineLearningTaskData& taskData)
+{
+	PipelineTask::processEngineStart(ctx, taskData);
+
+	// Keep a copy of the image references that will be processed
+	_imageReferences = ctx.imageReferences();
+}
+
+void MachineLearningTask::processEngineEnd(EngineExecutionContext& ctx, const MachineLearningTaskData& taskData)
+{
+	PipelineTask::processEngineEnd(ctx, taskData);
+
+	_imageReferences.clear();
+}
diff --git a/Grinder/ml/tasks/MachineLearningTask.h b/Grinder/ml/tasks/MachineLearningTask.h
index b87b50c..6e36bab 100644
--- a/Grinder/ml/tasks/MachineLearningTask.h
+++ b/Grinder/ml/tasks/MachineLearningTask.h
@@ -13,11 +13,13 @@
 
 namespace grndr
 {
+	class MachineLearningMethodBase;
 	class ImageTag;
 	class ImageTags;
 
 	struct MachineLearningTaskData
 	{
+		const MachineLearningMethodBase* method{nullptr};
 		const ImageTags* imageTags{nullptr};
 
 		cv::Mat imageData;
@@ -37,11 +39,19 @@ namespace grndr
 		using PipelineTask<MachineLearningTaskData>::PipelineTask;
 
 	public:
+		virtual void processEngineStart(EngineExecutionContext& ctx, const MachineLearningTaskData& taskData) override;
+		virtual void processEngineEnd(EngineExecutionContext& ctx, const MachineLearningTaskData& taskData) override;
+
+	public:
+		virtual void trainImage(const cv::Mat& imageData, const ImageReference* imageRef) { Q_UNUSED(imageData); Q_UNUSED(imageRef); }
 		virtual void inferImage(const cv::Mat& imageData, const ImageReference* imageRef) { Q_UNUSED(imageData); Q_UNUSED(imageRef); }
 
 	signals:
 		void initializationFinished();
 		void inferImageFinished(const ImageInferenceResults& results);
+
+	protected:
+		std::vector<const ImageReference*> _imageReferences;
 	};
 }
 
diff --git a/Grinder/pipeline/Block.h b/Grinder/pipeline/Block.h
index 1c2f5ab..8d26120 100644
--- a/Grinder/pipeline/Block.h
+++ b/Grinder/pipeline/Block.h
@@ -11,6 +11,7 @@
 #include "PipelineItem.h"
 #include "BlockType.h"
 #include "BlockCategory.h"
+#include "BlockDataCache.h"
 #include "PortVector.h"
 
 namespace grndr
@@ -62,6 +63,9 @@ namespace grndr
 		bool isBypassSet() const { return _bypassBlock && _bypassPossible; }
 		void setBypass(bool bypass = true) { _bypassBlock = bypass; }
 
+	public:
+		BlockDataCache& dataCache() const { return _dataCache; }
+
 	public:
 		virtual void serialize(SerializationContext& ctx) const override;
 		virtual void deserialize(DeserializationContext& ctx) override;
@@ -90,6 +94,9 @@ namespace grndr
 
 		bool _bypassPossible{true};
 		bool _bypassBlock{false};
+
+	protected:
+		mutable BlockDataCache _dataCache;
 	};
 }
 
diff --git a/Grinder/pipeline/BlockDataCache.cpp b/Grinder/pipeline/BlockDataCache.cpp
new file mode 100644
index 0000000..d8b5f62
--- /dev/null
+++ b/Grinder/pipeline/BlockDataCache.cpp
@@ -0,0 +1,42 @@
+/******************************************************************************
+ * File: BlockDataCache.cpp
+ * Date: 07.9.2019
+ *****************************************************************************/
+
+#include "Grinder.h"
+#include "BlockDataCache.h"
+
+QVariant BlockDataCache::get(QString name) const
+{
+	auto it = _values.find(name);
+
+	if (it != _values.cend())
+		return it->second;
+
+	return QVariant{};
+}
+
+void BlockDataCache::set(QString name, const QVariant& data)
+{
+	_values[name] = data;
+}
+
+void BlockDataCache::set(QString name, QVariant&& data)
+{
+	_values[name] = std::move(data);
+}
+
+bool BlockDataCache::contains(QString name) const
+{
+	return _values.find(name) != _values.cend();
+}
+
+void BlockDataCache::remove(QString name)
+{
+	_values.erase(name);
+}
+
+void BlockDataCache::clear()
+{
+	_values.clear();
+}
diff --git a/Grinder/pipeline/BlockDataCache.h b/Grinder/pipeline/BlockDataCache.h
new file mode 100644
index 0000000..1199205
--- /dev/null
+++ b/Grinder/pipeline/BlockDataCache.h
@@ -0,0 +1,39 @@
+/******************************************************************************
+ * File: BlockDataCache.h
+ * Date: 07.9.2019
+ *****************************************************************************/
+
+#ifndef BLOCKDATACACHE_H
+#define BLOCKDATACACHE_H
+
+#include <QVariant>
+
+namespace grndr
+{
+	class BlockDataCache
+	{
+	private:
+		using Values = std::map<QString, QVariant>;
+
+	public:
+		QVariant get(QString name) const;
+
+		template<typename ValueType>
+		ValueType get(QString name) const { return get(name).value<ValueType>(); }
+		void set(QString name, const QVariant& data);
+		template<typename ValueType>
+		void set(QString name, const ValueType& data) { set(name, QVariant::fromValue<ValueType>(data)); }
+		void set(QString name, QVariant&& data);
+
+		bool contains(QString name) const;
+
+		void remove(QString name);
+
+		void clear();
+
+	private:
+		Values _values;
+	};
+}
+
+#endif
diff --git a/Grinder/pipeline/processors/PipelineTaskProcessor.h b/Grinder/pipeline/processors/PipelineTaskProcessor.h
new file mode 100644
index 0000000..5952f6c
--- /dev/null
+++ b/Grinder/pipeline/processors/PipelineTaskProcessor.h
@@ -0,0 +1,56 @@
+/******************************************************************************
+ * File: PipelineTaskProcessor.h
+ * Date: 08.9.2019
+ *****************************************************************************/
+
+#ifndef PIPELINETASKPROCESSOR_H
+#define PIPELINETASKPROCESSOR_H
+
+#include "engine/Processor.h"
+
+namespace grndr
+{
+	template<typename BlockType, typename TaskType, typename DataType>
+	class PipelineTaskProcessor : public Processor<BlockType>
+	{
+	public:
+		using task_type = TaskType;
+		using data_type = DataType;
+
+	protected:
+		static const char* Data_Value_SpawnedTask;
+
+	public:
+		PipelineTaskProcessor(const Block* block, bool requiresBatchMode = false);
+
+	public:
+		virtual void execute(EngineExecutionContext& ctx) override;
+
+	protected:
+		virtual void preExecution(EngineExecutionContext& ctx, const data_type& taskData) { Q_UNUSED(ctx); Q_UNUSED(taskData); }
+		virtual void postExecution(EngineExecutionContext& ctx, const data_type& taskData) { Q_UNUSED(ctx); Q_UNUSED(taskData); }
+		virtual void bypassExecution(EngineExecutionContext& ctx) { Q_UNUSED(ctx); }
+
+		virtual void firstExecutionPass(EngineExecutionContext& ctx, const data_type& taskData) { Q_UNUSED(ctx); Q_UNUSED(taskData); }
+		virtual void executionPass(EngineExecutionContext& ctx, const data_type& taskData) { Q_UNUSED(ctx); Q_UNUSED(taskData); }
+		virtual void lastExecutionPass(EngineExecutionContext& ctx, const data_type& taskData) { Q_UNUSED(ctx); Q_UNUSED(taskData); }
+
+		virtual void fillTaskData(EngineExecutionContext& ctx, data_type& taskData) const { Q_UNUSED(ctx); Q_UNUSED(taskData); }
+
+	protected:
+		virtual void spawnTask(const data_type& taskData) = 0;
+
+	protected:
+		void taskLoop(EngineExecutionContext& ctx, LongOperation* longOp, const bool* waitCondition = nullptr) const;
+
+	protected:
+		std::shared_ptr<task_type> _spawnedTask;
+
+	protected:
+		bool _requiresBatchMode{false};
+	};
+}
+
+#include "PipelineTaskProcessor.impl.h"
+
+#endif
diff --git a/Grinder/pipeline/processors/PipelineTaskProcessor.impl.h b/Grinder/pipeline/processors/PipelineTaskProcessor.impl.h
new file mode 100644
index 0000000..92df98c
--- /dev/null
+++ b/Grinder/pipeline/processors/PipelineTaskProcessor.impl.h
@@ -0,0 +1,104 @@
+/******************************************************************************
+ * File: PipelineTaskProcessor.impl.h
+ * Date: 08.9.2019
+ *****************************************************************************/
+
+#include "Grinder.h"
+#include "PipelineTaskProcessor.h"
+#include "core/GrinderApplication.h"
+
+template<typename BlockType, typename TaskType, typename DataType>
+const char* PipelineTaskProcessor<BlockType, TaskType, DataType>::Data_Value_SpawnedTask = "SpawnedTask";
+
+template<typename BlockType, typename TaskType, typename DataType>
+PipelineTaskProcessor<BlockType, TaskType, DataType>::PipelineTaskProcessor(const Block* block, bool requiresBatchMode) : Processor<BlockType>(block),
+	_requiresBatchMode{requiresBatchMode}
+{
+
+}
+
+template<typename BlockType, typename TaskType, typename DataType>
+void PipelineTaskProcessor<BlockType, TaskType, DataType>::execute(EngineExecutionContext& ctx)
+{
+	if (!this->isBlockBypassed())
+	{
+		// Take care of the machine learning task
+		if (!_requiresBatchMode || ctx.hasExecutionFlag(Engine::ExecutionFlag::Batch))
+		{
+			// Get the task data for machine learning
+			data_type taskData;
+			fillTaskData(ctx, taskData);
+
+			// Spawn the task when the first image is active
+			if (ctx.isFirstImage())
+			{
+				preExecution(ctx, taskData);
+
+				spawnTask(taskData);
+				ctx.persistentData().set(this->_block, Data_Value_SpawnedTask, _spawnedTask);	// Store the spawned task in the persistent context data
+			}
+			else
+			{
+				_spawnedTask = ctx.persistentData().get<std::shared_ptr<task_type>>(this->block(), Data_Value_SpawnedTask);	// Retrieve the stored spawned task from the persistent context data
+
+				if (!_spawnedTask)
+					this->throwProcessorException("No machine learning task has been spawned");
+			}
+
+			if (ctx.isFirstImage())
+			{
+				_spawnedTask->processEngineStart(ctx, taskData);
+				firstExecutionPass(ctx, taskData);
+			}
+
+			if (!ctx.wasAborted())	// Only perform passes if not aborted
+			{
+				_spawnedTask->processEnginePass(ctx, taskData);
+				executionPass(ctx, taskData);
+			}
+
+			if (ctx.isLastImage())
+			{
+				_spawnedTask->processEngineEnd(ctx, taskData);
+				lastExecutionPass(ctx, taskData);
+
+				// Forget the spawned task
+				_spawnedTask = nullptr;
+				ctx.persistentData().remove(this->_block, Data_Value_SpawnedTask);
+
+				postExecution(ctx, taskData);
+			}
+		}
+		else
+			this->throwProcessorException("The block can only be used in batch mode; bypass this block to avoid this warning");
+	}
+	else
+		bypassExecution(ctx);
+}
+
+template<typename BlockType, typename TaskType, typename DataType>
+void PipelineTaskProcessor<BlockType, TaskType, DataType>::taskLoop(EngineExecutionContext& ctx, LongOperation* longOp, const bool* waitCondition) const
+{
+	// Wait for the task to finish
+	while (_spawnedTask->isRunning())
+	{
+		// Process application events and sleep a bit
+		QApplication::processEvents();
+		QThread::msleep(25);
+
+		if (waitCondition)
+		{
+			if (*waitCondition)
+				break;
+		}
+
+		if (longOp->wasCanceled())
+			ctx.abortProcessing();
+
+		if (ctx.wasAborted())
+		{
+			grinder()->taskController().stopTask(_spawnedTask.get());
+			break;
+		}
+	}
+}
-- 
GitLab