Skip to content
Snippets Groups Projects
Select Git revision
  • 0015a0195edd1eb25c83256d62d66207fd7247bc
  • master default protected
  • ccnc-demo
  • noetic/main
  • base
5 results

manipulation_pipeline.cpp

Blame
  • SebastianEbert's avatar
    Sebastian Ebert authored
    0015a019
    History
    manipulation_pipeline.cpp 7.05 KiB
    /*********************************************************************
    * Software License Agreement (BSD License)
    *
    *  Copyright (c) 2012, Willow Garage, Inc.
    *  All rights reserved.
    *
    *  Redistribution and use in source and binary forms, with or without
    *  modification, are permitted provided that the following conditions
    *  are met:
    *
    *   * Redistributions of source code must retain the above copyright
    *     notice, this list of conditions and the following disclaimer.
    *   * Redistributions in binary form must reproduce the above
    *     copyright notice, this list of conditions and the following
    *     disclaimer in the documentation and/or other materials provided
    *     with the distribution.
    *   * Neither the name of Willow Garage nor the names of its
    *     contributors may be used to endorse or promote products derived
    *     from this software without specific prior written permission.
    *
    *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
    *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
    *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
    *  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
    *  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
    *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
    *  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
    *  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
    *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
    *  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
    *  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
    *  POSSIBILITY OF SUCH DAMAGE.
    *********************************************************************/
    
    /* Author: Ioan Sucan */
    
    #include <moveit/pick_place/manipulation_pipeline.h>
    #include <ros/console.h>
    
    namespace pick_place
    {
    ManipulationPipeline::ManipulationPipeline(const std::string& name, unsigned int nthreads)
      : name_(name), nthreads_(nthreads), verbose_(false), stop_processing_(true)
    {
      processing_threads_.resize(nthreads, nullptr);
    }
    
    ManipulationPipeline::~ManipulationPipeline()
    {
      reset();
    }
    
    ManipulationPipeline& ManipulationPipeline::addStage(const ManipulationStagePtr& next)
    {
      next->setVerbose(verbose_);
      stages_.push_back(next);
      return *this;
    }
    
    const ManipulationStagePtr& ManipulationPipeline::getFirstStage() const
    {
      if (stages_.empty())
      {
        static const ManipulationStagePtr EMPTY;
        return EMPTY;
      }
      else
        return stages_.front();
    }
    
    const ManipulationStagePtr& ManipulationPipeline::getLastStage() const
    {
      if (stages_.empty())
      {
        static const ManipulationStagePtr EMPTY;
        return EMPTY;
      }
      else
        return stages_.back();
    }
    
    void ManipulationPipeline::reset()
    {
      clear();
      stages_.clear();
    }
    
    void ManipulationPipeline::setVerbose(bool flag)
    {
      verbose_ = flag;
      for (pick_place::ManipulationStagePtr& stage : stages_)
        stage->setVerbose(flag);
    }
    
    void ManipulationPipeline::clear()
    {
      stop();
      {
        boost::mutex::scoped_lock slock(queue_access_lock_);
        queue_.clear();
      }
      {
        boost::mutex::scoped_lock slock(result_lock_);
        success_.clear();
        failed_.clear();
      }
    }
    
    void ManipulationPipeline::start()
    {
      stop_processing_ = false;
      empty_queue_threads_ = 0;
      for (pick_place::ManipulationStagePtr& stage : stages_)
        stage->resetStopSignal();
      for (std::size_t i = 0; i < processing_threads_.size(); ++i)
        if (!processing_threads_[i])
          processing_threads_[i] = new boost::thread(boost::bind(&ManipulationPipeline::processingThread, this, i));
    }
    
    void ManipulationPipeline::signalStop()
    {
      for (pick_place::ManipulationStagePtr& stage : stages_)
        stage->signalStop();
      stop_processing_ = true;
      queue_access_cond_.notify_all();
    }
    
    void ManipulationPipeline::stop()
    {
      signalStop();
      for (boost::thread*& processing_thread : processing_threads_)
        if (processing_thread)
        {
          processing_thread->join();
          delete processing_thread;
          processing_thread = nullptr;
        }
    }
    
    void ManipulationPipeline::processingThread(unsigned int index)
    {
      ROS_DEBUG_STREAM_NAMED("manipulation", "Start thread " << index << " for '" << name_ << "'");
    
      while (!stop_processing_)
      {
        bool inc_queue = false;
        boost::unique_lock<boost::mutex> ulock(queue_access_lock_);
        // if the queue is empty, we trigger the corresponding event
        if (queue_.empty() && !stop_processing_ && empty_queue_callback_)
        {
          empty_queue_threads_++;
          inc_queue = true;
          if (empty_queue_threads_ == processing_threads_.size())
            empty_queue_callback_();
        }
        while (queue_.empty() && !stop_processing_)
          queue_access_cond_.wait(ulock);
        while (!stop_processing_ && !queue_.empty())
        {
          ManipulationPlanPtr g = queue_.front();
          queue_.pop_front();
          if (inc_queue)
          {
            empty_queue_threads_--;
            inc_queue = false;
          }
    
          queue_access_lock_.unlock();
          try
          {
            g->error_code_.val = moveit_msgs::MoveItErrorCodes::FAILURE;
            for (std::size_t i = 0; !stop_processing_ && i < stages_.size(); ++i)
            {
              bool res = stages_[i]->evaluate(g);
              g->processing_stage_ = i + 1;
              if (!res)
              {
                boost::mutex::scoped_lock slock(result_lock_);
                failed_.push_back(g);
                ROS_INFO_STREAM_NAMED("manipulation", "Manipulation plan " << g->id_ << " failed at stage '"
                                                                           << stages_[i]->getName() << "' on thread "
                                                                           << index);
                break;
              }
            }
            if (g->error_code_.val == moveit_msgs::MoveItErrorCodes::SUCCESS)
            {
              g->processing_stage_++;
              {
                boost::mutex::scoped_lock slock(result_lock_);
                success_.push_back(g);
              }
              signalStop();
              ROS_INFO_STREAM_NAMED("manipulation", "Found successful manipulation plan!");
              if (solution_callback_)
                solution_callback_();
            }
          }
          catch (std::exception& ex)
          {
            ROS_ERROR_NAMED("manipulation", "[%s:%u] %s", name_.c_str(), index, ex.what());
          }
          queue_access_lock_.lock();
        }
      }
    }
    
    void ManipulationPipeline::push(const ManipulationPlanPtr& plan)
    {
      boost::mutex::scoped_lock slock(queue_access_lock_);
      queue_.push_back(plan);
      ROS_INFO_STREAM_NAMED("manipulation", "Added plan for pipeline '" << name_ << "'. Queue is now of size "
                                                                        << queue_.size());
      queue_access_cond_.notify_all();
    }
    
    void ManipulationPipeline::reprocessLastFailure()
    {
      boost::mutex::scoped_lock slock(queue_access_lock_);
      if (failed_.empty())
        return;
      ManipulationPlanPtr plan = failed_.back();
      failed_.pop_back();
      plan->clear();
      queue_.push_back(plan);
      ROS_INFO_STREAM_NAMED("manipulation", "Re-added last failed plan for pipeline '"
                                                << name_ << "'. Queue is now of size " << queue_.size());
      queue_access_cond_.notify_all();
    }
    }  // namespace pick_place