/********************************************************************* * Software License Agreement (BSD License) * * Copyright (c) 2012, Willow Garage, Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * * Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above * copyright notice, this list of conditions and the following * disclaimer in the documentation and/or other materials provided * with the distribution. * * Neither the name of Willow Garage nor the names of its * contributors may be used to endorse or promote products derived * from this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. *********************************************************************/ /* Author: Ioan Sucan */ #include <moveit/pick_place/manipulation_pipeline.h> #include <ros/console.h> namespace pick_place { ManipulationPipeline::ManipulationPipeline(const std::string& name, unsigned int nthreads) : name_(name), nthreads_(nthreads), verbose_(false), stop_processing_(true) { processing_threads_.resize(nthreads, nullptr); } ManipulationPipeline::~ManipulationPipeline() { reset(); } ManipulationPipeline& ManipulationPipeline::addStage(const ManipulationStagePtr& next) { next->setVerbose(verbose_); stages_.push_back(next); return *this; } const ManipulationStagePtr& ManipulationPipeline::getFirstStage() const { if (stages_.empty()) { static const ManipulationStagePtr EMPTY; return EMPTY; } else return stages_.front(); } const ManipulationStagePtr& ManipulationPipeline::getLastStage() const { if (stages_.empty()) { static const ManipulationStagePtr EMPTY; return EMPTY; } else return stages_.back(); } void ManipulationPipeline::reset() { clear(); stages_.clear(); } void ManipulationPipeline::setVerbose(bool flag) { verbose_ = flag; for (std::size_t i = 0; i < stages_.size(); ++i) stages_[i]->setVerbose(flag); } void ManipulationPipeline::clear() { stop(); { boost::mutex::scoped_lock slock(queue_access_lock_); queue_.clear(); } { boost::mutex::scoped_lock slock(result_lock_); success_.clear(); failed_.clear(); } } void ManipulationPipeline::start() { stop_processing_ = false; empty_queue_threads_ = 0; for (std::size_t i = 0; i < stages_.size(); ++i) stages_[i]->resetStopSignal(); for (std::size_t i = 0; i < processing_threads_.size(); ++i) if (!processing_threads_[i]) processing_threads_[i] = new boost::thread(boost::bind(&ManipulationPipeline::processingThread, this, i)); } void ManipulationPipeline::signalStop() { for (std::size_t i = 0; i < stages_.size(); ++i) stages_[i]->signalStop(); stop_processing_ = true; queue_access_cond_.notify_all(); } void ManipulationPipeline::stop() { signalStop(); for (std::size_t i = 0; i < processing_threads_.size(); ++i) if (processing_threads_[i]) { processing_threads_[i]->join(); delete processing_threads_[i]; processing_threads_[i] = nullptr; } } void ManipulationPipeline::processingThread(unsigned int index) { ROS_DEBUG_STREAM_NAMED("manipulation", "Start thread " << index << " for '" << name_ << "'"); while (!stop_processing_) { bool inc_queue = false; boost::unique_lock<boost::mutex> ulock(queue_access_lock_); // if the queue is empty, we trigger the corresponding event if (queue_.empty() && !stop_processing_ && empty_queue_callback_) { empty_queue_threads_++; inc_queue = true; if (empty_queue_threads_ == processing_threads_.size()) empty_queue_callback_(); } while (queue_.empty() && !stop_processing_) queue_access_cond_.wait(ulock); while (!stop_processing_ && !queue_.empty()) { ManipulationPlanPtr g = queue_.front(); queue_.pop_front(); if (inc_queue) { empty_queue_threads_--; inc_queue = false; } queue_access_lock_.unlock(); try { g->error_code_.val = moveit_msgs::MoveItErrorCodes::FAILURE; for (std::size_t i = 0; !stop_processing_ && i < stages_.size(); ++i) { bool res = stages_[i]->evaluate(g); g->processing_stage_ = i + 1; if (!res) { boost::mutex::scoped_lock slock(result_lock_); failed_.push_back(g); ROS_INFO_STREAM_NAMED("manipulation", "Manipulation plan " << g->id_ << " failed at stage '" << stages_[i]->getName() << "' on thread " << index); break; } } if (g->error_code_.val == moveit_msgs::MoveItErrorCodes::SUCCESS) { g->processing_stage_++; { boost::mutex::scoped_lock slock(result_lock_); success_.push_back(g); } signalStop(); ROS_INFO_STREAM_NAMED("manipulation", "Found successful manipulation plan!"); if (solution_callback_) solution_callback_(); } } catch (std::exception& ex) { ROS_ERROR_NAMED("manipulation", "[%s:%u] %s", name_.c_str(), index, ex.what()); } queue_access_lock_.lock(); } } } void ManipulationPipeline::push(const ManipulationPlanPtr& plan) { boost::mutex::scoped_lock slock(queue_access_lock_); queue_.push_back(plan); ROS_INFO_STREAM_NAMED("manipulation", "Added plan for pipeline '" << name_ << "'. Queue is now of size " << queue_.size()); queue_access_cond_.notify_all(); } void ManipulationPipeline::reprocessLastFailure() { boost::mutex::scoped_lock slock(queue_access_lock_); if (failed_.empty()) return; ManipulationPlanPtr plan = failed_.back(); failed_.pop_back(); plan->clear(); queue_.push_back(plan); ROS_INFO_STREAM_NAMED("manipulation", "Re-added last failed plan for pipeline '" << name_ << "'. Queue is now of size " << queue_.size()); queue_access_cond_.notify_all(); } } // namespace pick_place