/* * Copyright 2018 The Cartographer Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "cartographer/cloud/internal/local_trajectory_uploader.h" #include #include #include "absl/memory/memory.h" #include "async_grpc/client.h" #include "cartographer/cloud/internal/handlers/add_sensor_data_batch_handler.h" #include "cartographer/cloud/internal/handlers/add_trajectory_handler.h" #include "cartographer/cloud/internal/handlers/finish_trajectory_handler.h" #include "cartographer/cloud/internal/sensor/serialization.h" #include "cartographer/common/internal/blocking_queue.h" #include "glog/logging.h" #include "grpc++/grpc++.h" namespace cartographer { namespace cloud { namespace { using absl::make_unique; constexpr int kConnectionTimeoutInSeconds = 10; constexpr int kConnectionRecoveryTimeoutInSeconds = 60; constexpr int kTokenRefreshIntervalInSeconds = 60; const common::Duration kPopTimeout = common::FromMilliseconds(100); // This defines the '::grpc::StatusCode's that are considered unrecoverable // errors and hence no retries will be attempted by the client. const std::set<::grpc::StatusCode> kUnrecoverableStatusCodes = { ::grpc::DEADLINE_EXCEEDED, ::grpc::NOT_FOUND, ::grpc::UNAVAILABLE, ::grpc::UNKNOWN, }; bool IsNewSubmap(const mapping::proto::Submap& submap) { return (submap.has_submap_2d() && submap.submap_2d().num_range_data() == 1) || (submap.has_submap_3d() && submap.submap_3d().num_range_data() == 1); } class LocalTrajectoryUploader : public LocalTrajectoryUploaderInterface { public: struct TrajectoryInfo { // nullopt if uplink has not yet responded to AddTrajectoryRequest. absl::optional uplink_trajectory_id; const std::set expected_sensor_ids; const mapping::proto::TrajectoryBuilderOptions trajectory_options; const std::string client_id; }; public: LocalTrajectoryUploader(const std::string& uplink_server_address, int batch_size, bool enable_ssl_encryption, bool enable_google_auth); ~LocalTrajectoryUploader(); // Starts the upload thread. void Start() final; // Shuts down the upload thread. This method blocks until the shutdown is // complete. void Shutdown() final; grpc::Status AddTrajectory( const std::string& client_id, int local_trajectory_id, const std::set& expected_sensor_ids, const mapping::proto::TrajectoryBuilderOptions& trajectory_options) final; grpc::Status FinishTrajectory(const std::string& client_id, int local_trajectory_id) final; void EnqueueSensorData(std::unique_ptr sensor_data) final; void TryRecovery(); SensorId GetLocalSlamResultSensorId(int local_trajectory_id) const final { return SensorId{SensorId::SensorType::LOCAL_SLAM_RESULT, "local_slam_result_" + std::to_string(local_trajectory_id)}; } private: void ProcessSendQueue(); // Returns 'false' for failure. bool TranslateTrajectoryId(proto::SensorMetadata* sensor_metadata); grpc::Status RegisterTrajectory(int local_trajectory_id); std::shared_ptr<::grpc::Channel> client_channel_; int batch_size_; std::map local_trajectory_id_to_trajectory_info_; common::BlockingQueue> send_queue_; bool shutting_down_ = false; std::unique_ptr upload_thread_; }; LocalTrajectoryUploader::LocalTrajectoryUploader( const std::string& uplink_server_address, int batch_size, bool enable_ssl_encryption, bool enable_google_auth) : batch_size_(batch_size) { auto channel_creds = enable_google_auth ? grpc::GoogleDefaultCredentials() : (enable_ssl_encryption ? ::grpc::SslCredentials(::grpc::SslCredentialsOptions()) : ::grpc::InsecureChannelCredentials()); client_channel_ = ::grpc::CreateChannel(uplink_server_address, channel_creds); std::chrono::system_clock::time_point deadline = std::chrono::system_clock::now() + std::chrono::seconds(kConnectionTimeoutInSeconds); LOG(INFO) << "Connecting to uplink " << uplink_server_address; if (!client_channel_->WaitForConnected(deadline)) { LOG(ERROR) << "Failed to connect to " << uplink_server_address; } } LocalTrajectoryUploader::~LocalTrajectoryUploader() {} void LocalTrajectoryUploader::Start() { CHECK(!shutting_down_); CHECK(!upload_thread_); upload_thread_ = make_unique([this]() { this->ProcessSendQueue(); }); } void LocalTrajectoryUploader::Shutdown() { CHECK(!shutting_down_); CHECK(upload_thread_); shutting_down_ = true; upload_thread_->join(); } void LocalTrajectoryUploader::TryRecovery() { if (client_channel_->GetState(false /* try_to_connect */) != grpc_connectivity_state::GRPC_CHANNEL_READY) { LOG(INFO) << "Trying to re-connect to uplink..."; std::chrono::system_clock::time_point deadline = std::chrono::system_clock::now() + std::chrono::seconds(kConnectionRecoveryTimeoutInSeconds); if (!client_channel_->WaitForConnected(deadline)) { LOG(ERROR) << "Failed to re-connect to uplink prior to recovery attempt."; return; } } LOG(INFO) << "Uplink channel ready, trying recovery."; // Wind the sensor_data_queue forward to the next new submap. LOG(INFO) << "LocalTrajectoryUploader tries to recover with next submap."; while (true) { if (shutting_down_) { return; } proto::SensorData* sensor_data = send_queue_.PeekWithTimeout(kPopTimeout); if (sensor_data) { CHECK_GE(sensor_data->local_slam_result_data().submaps_size(), 0); if (sensor_data->sensor_data_case() == proto::SensorData::kLocalSlamResultData && sensor_data->local_slam_result_data().submaps_size() > 0 && IsNewSubmap(sensor_data->local_slam_result_data().submaps( sensor_data->local_slam_result_data().submaps_size() - 1))) { break; } else { send_queue_.Pop(); } } } // Because the trajectories may be interrupted on the uplink side, we can no // longer upload to those. for (auto& entry : local_trajectory_id_to_trajectory_info_) { entry.second.uplink_trajectory_id.reset(); } // TODO(gaschler): If the uplink did not restart but only the connection was // interrupted, this leaks trajectories in the uplink. // Attempt to recreate the trajectories. for (const auto& entry : local_trajectory_id_to_trajectory_info_) { grpc::Status status = RegisterTrajectory(entry.first); if (!status.ok()) { LOG(ERROR) << "Failed to create trajectory. Aborting recovery attempt. " << status.error_message(); return; } } LOG(INFO) << "LocalTrajectoryUploader recovered."; } void LocalTrajectoryUploader::ProcessSendQueue() { LOG(INFO) << "Starting uploader thread."; proto::AddSensorDataBatchRequest batch_request; while (!shutting_down_) { auto sensor_data = send_queue_.PopWithTimeout(kPopTimeout); if (sensor_data) { if (!TranslateTrajectoryId(sensor_data->mutable_sensor_metadata())) { batch_request.clear_sensor_data(); TryRecovery(); continue; } proto::SensorData* added_sensor_data = batch_request.add_sensor_data(); *added_sensor_data = *sensor_data; // A submap also holds a trajectory id that must be translated to uplink's // trajectory id. if (added_sensor_data->has_local_slam_result_data()) { for (mapping::proto::Submap& mutable_submap : *added_sensor_data->mutable_local_slam_result_data() ->mutable_submaps()) { mutable_submap.mutable_submap_id()->set_trajectory_id( added_sensor_data->sensor_metadata().trajectory_id()); } } if (batch_request.sensor_data_size() == batch_size_) { async_grpc::Client client( client_channel_, common::FromSeconds(kConnectionTimeoutInSeconds), async_grpc::CreateUnlimitedConstantDelayStrategy( common::FromSeconds(1), kUnrecoverableStatusCodes)); if (client.Write(batch_request)) { LOG(INFO) << "Uploaded " << batch_request.ByteSize() << " bytes of sensor data."; batch_request.clear_sensor_data(); continue; } // Unrecoverable error occurred. Attempt recovery. batch_request.clear_sensor_data(); TryRecovery(); } } } } bool LocalTrajectoryUploader::TranslateTrajectoryId( proto::SensorMetadata* sensor_metadata) { auto it = local_trajectory_id_to_trajectory_info_.find( sensor_metadata->trajectory_id()); if (it == local_trajectory_id_to_trajectory_info_.end()) { return false; } if (!it->second.uplink_trajectory_id.has_value()) { // Could not yet register trajectory with uplink server. return false; } int cloud_trajectory_id = it->second.uplink_trajectory_id.value(); sensor_metadata->set_trajectory_id(cloud_trajectory_id); return true; } grpc::Status LocalTrajectoryUploader::AddTrajectory( const std::string& client_id, int local_trajectory_id, const std::set& expected_sensor_ids, const mapping::proto::TrajectoryBuilderOptions& trajectory_options) { CHECK_EQ(local_trajectory_id_to_trajectory_info_.count(local_trajectory_id), 0); local_trajectory_id_to_trajectory_info_.emplace( local_trajectory_id, TrajectoryInfo{{}, expected_sensor_ids, trajectory_options, client_id}); return RegisterTrajectory(local_trajectory_id); } grpc::Status LocalTrajectoryUploader::RegisterTrajectory( int local_trajectory_id) { TrajectoryInfo& trajectory_info = local_trajectory_id_to_trajectory_info_.at(local_trajectory_id); proto::AddTrajectoryRequest request; request.set_client_id(trajectory_info.client_id); *request.mutable_trajectory_builder_options() = trajectory_info.trajectory_options; for (const SensorId& sensor_id : trajectory_info.expected_sensor_ids) { // Range sensors are not forwarded, but combined into a LocalSlamResult. if (sensor_id.type != SensorId::SensorType::RANGE) { *request.add_expected_sensor_ids() = cloud::ToProto(sensor_id); } } *request.add_expected_sensor_ids() = cloud::ToProto(GetLocalSlamResultSensorId(local_trajectory_id)); async_grpc::Client client( client_channel_, common::FromSeconds(kConnectionTimeoutInSeconds)); ::grpc::Status status; if (!client.Write(request, &status)) { LOG(ERROR) << "Failed to register local_trajectory_id " << local_trajectory_id << " with uplink server. " << status.error_message(); return status; } LOG(INFO) << "Created trajectory for client_id: " << trajectory_info.client_id << " local trajectory_id: " << local_trajectory_id << " uplink trajectory_id: " << client.response().trajectory_id(); trajectory_info.uplink_trajectory_id = client.response().trajectory_id(); return status; } grpc::Status LocalTrajectoryUploader::FinishTrajectory( const std::string& client_id, int local_trajectory_id) { auto it = local_trajectory_id_to_trajectory_info_.find(local_trajectory_id); if (it == local_trajectory_id_to_trajectory_info_.end()) { return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, "local_trajectory_id has not been" " registered with AddTrajectory."); } auto cloud_trajectory_id = it->second.uplink_trajectory_id; if (!cloud_trajectory_id.has_value()) { return grpc::Status( grpc::StatusCode::UNAVAILABLE, "trajectory_id has not been created in uplink, ignoring."); } proto::FinishTrajectoryRequest request; request.set_client_id(client_id); request.set_trajectory_id(cloud_trajectory_id.value()); async_grpc::Client client( client_channel_, common::FromSeconds(kConnectionTimeoutInSeconds)); grpc::Status status; client.Write(request, &status); return status; } void LocalTrajectoryUploader::EnqueueSensorData( std::unique_ptr sensor_data) { send_queue_.Push(std::move(sensor_data)); } } // namespace std::unique_ptr CreateLocalTrajectoryUploader( const std::string& uplink_server_address, int batch_size, bool enable_ssl_encryption, bool enable_google_auth) { return make_unique(uplink_server_address, batch_size, enable_ssl_encryption, enable_google_auth); } } // namespace cloud } // namespace cartographer