Libosmium  2.8.0
Fast and flexible C++ library for working with OpenStreetMap data
reader.hpp
Go to the documentation of this file.
1 #ifndef OSMIUM_IO_READER_HPP
2 #define OSMIUM_IO_READER_HPP
3 
4 /*
5 
6 This file is part of Osmium (http://osmcode.org/libosmium).
7 
8 Copyright 2013-2016 Jochen Topf <jochen@topf.org> and others (see README).
9 
10 Boost Software License - Version 1.0 - August 17th, 2003
11 
12 Permission is hereby granted, free of charge, to any person or organization
13 obtaining a copy of the software and accompanying documentation covered by
14 this license (the "Software") to use, reproduce, display, distribute,
15 execute, and transmit the Software, and to prepare derivative works of the
16 Software, and to permit third-parties to whom the Software is furnished to
17 do so, all subject to the following:
18 
19 The copyright notices in the Software and this entire statement, including
20 the above license grant, this restriction and the following disclaimer,
21 must be included in all copies of the Software, in whole or in part, and
22 all derivative works of the Software, unless such copies or derivative
23 works are solely in the form of machine-executable object code generated by
24 a source language processor.
25 
26 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
27 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
28 FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
29 SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
30 FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
31 ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
32 DEALINGS IN THE SOFTWARE.
33 
34 */
35 
36 #include <cerrno>
37 #include <cstdlib>
38 #include <fcntl.h>
39 #include <future>
40 #include <memory>
41 #include <string>
42 #include <system_error>
43 #include <thread>
44 #include <utility>
45 
46 #ifndef _WIN32
47 # include <sys/wait.h>
48 #endif
49 
50 #ifndef _MSC_VER
51 # include <unistd.h>
52 #endif
53 
55 #include <osmium/io/detail/input_format.hpp>
56 #include <osmium/io/detail/read_thread.hpp>
57 #include <osmium/io/detail/read_write.hpp>
58 #include <osmium/io/detail/queue_util.hpp>
59 #include <osmium/io/error.hpp>
60 #include <osmium/io/file.hpp>
61 #include <osmium/io/header.hpp>
62 #include <osmium/memory/buffer.hpp>
64 #include <osmium/thread/util.hpp>
65 #include <osmium/util/config.hpp>
66 
67 namespace osmium {
68 
69  namespace io {
70 
71  namespace detail {
72 
73  inline size_t get_input_queue_size() noexcept {
74  size_t n = osmium::config::get_max_queue_size("INPUT", 20);
75  return n > 2 ? n : 2;
76  }
77 
78  inline size_t get_osmdata_queue_size() noexcept {
79  size_t n = osmium::config::get_max_queue_size("OSMDATA", 20);
80  return n > 2 ? n : 2;
81  }
82 
83  } // namespace detail
84 
91  class Reader {
92 
95 
96  enum class status {
97  okay = 0, // normal reading
98  error = 1, // some error occurred while reading
99  closed = 2, // close() called successfully after eof
100  eof = 3 // eof of file was reached without error
101  } m_status;
102 
104 
105  detail::future_string_queue_type m_input_queue;
106 
107  std::unique_ptr<osmium::io::Decompressor> m_decompressor;
108 
109  osmium::io::detail::ReadThreadManager m_read_thread_manager;
110 
111  detail::future_buffer_queue_type m_osmdata_queue;
112  detail::queue_wrapper<osmium::memory::Buffer> m_osmdata_queue_wrapper;
113 
114  std::future<osmium::io::Header> m_header_future;
116 
118 
119  // This function will run in a separate thread.
120  static void parser_thread(const osmium::io::File& file,
121  detail::future_string_queue_type& input_queue,
122  detail::future_buffer_queue_type& osmdata_queue,
123  std::promise<osmium::io::Header>&& header_promise,
124  osmium::osm_entity_bits::type read_which_entities) {
125  std::promise<osmium::io::Header> promise = std::move(header_promise);
126  auto creator = detail::ParserFactory::instance().get_creator_function(file);
127  auto parser = creator(input_queue, osmdata_queue, promise, read_which_entities);
128  parser->parse();
129  }
130 
131 #ifndef _WIN32
132 
143  static int execute(const std::string& command, const std::string& filename, int* childpid) {
144  int pipefd[2];
145  if (pipe(pipefd) < 0) {
146  throw std::system_error(errno, std::system_category(), "opening pipe failed");
147  }
148  pid_t pid = fork();
149  if (pid < 0) {
150  throw std::system_error(errno, std::system_category(), "fork failed");
151  }
152  if (pid == 0) { // child
153  // close all file descriptors except one end of the pipe
154  for (int i = 0; i < 32; ++i) {
155  if (i != pipefd[1]) {
156  ::close(i);
157  }
158  }
159  if (dup2(pipefd[1], 1) < 0) { // put end of pipe as stdout/stdin
160  exit(1);
161  }
162 
163  ::open("/dev/null", O_RDONLY); // stdin
164  ::open("/dev/null", O_WRONLY); // stderr
165  // hack: -g switches off globbing in curl which allows [] to be used in file names
166  // this is important for XAPI URLs
167  // in theory this execute() function could be used for other commands, but it is
168  // only used for curl at the moment, so this is okay.
169  if (::execlp(command.c_str(), command.c_str(), "-g", filename.c_str(), nullptr) < 0) {
170  exit(1);
171  }
172  }
173  // parent
174  *childpid = pid;
175  ::close(pipefd[1]);
176  return pipefd[0];
177  }
178 #endif
179 
188  static int open_input_file_or_url(const std::string& filename, int* childpid) {
189  std::string protocol = filename.substr(0, filename.find_first_of(':'));
190  if (protocol == "http" || protocol == "https" || protocol == "ftp" || protocol == "file") {
191 #ifndef _WIN32
192  return execute("curl", filename, childpid);
193 #else
194  throw io_error("Reading OSM files from the network currently not supported on Windows.");
195 #endif
196  } else {
197  return osmium::io::detail::open_for_reading(filename);
198  }
199  }
200 
201  public:
202 
213  m_file(file.check()),
214  m_read_which_entities(read_which_entities),
215  m_status(status::okay),
216  m_childpid(0),
217  m_input_queue(detail::get_input_queue_size(), "raw_input"),
218  m_decompressor(m_file.buffer() ?
219  osmium::io::CompressionFactory::instance().create_decompressor(file.compression(), m_file.buffer(), m_file.buffer_size()) :
220  osmium::io::CompressionFactory::instance().create_decompressor(file.compression(), open_input_file_or_url(m_file.filename(), &m_childpid))),
221  m_read_thread_manager(*m_decompressor, m_input_queue),
222  m_osmdata_queue(detail::get_osmdata_queue_size(), "parser_results"),
223  m_osmdata_queue_wrapper(m_osmdata_queue),
224  m_header_future(),
225  m_header(),
226  m_thread() {
227  std::promise<osmium::io::Header> header_promise;
228  m_header_future = header_promise.get_future();
229  m_thread = osmium::thread::thread_handler{parser_thread, std::ref(m_file), std::ref(m_input_queue), std::ref(m_osmdata_queue), std::move(header_promise), read_which_entities};
230  }
231 
232  explicit Reader(const std::string& filename, osmium::osm_entity_bits::type read_types = osmium::osm_entity_bits::all) :
233  Reader(osmium::io::File(filename), read_types) {
234  }
235 
236  explicit Reader(const char* filename, osmium::osm_entity_bits::type read_types = osmium::osm_entity_bits::all) :
237  Reader(osmium::io::File(filename), read_types) {
238  }
239 
240  Reader(const Reader&) = delete;
241  Reader& operator=(const Reader&) = delete;
242 
243  Reader(Reader&&) = default;
244  Reader& operator=(Reader&&) = default;
245 
246  ~Reader() noexcept {
247  try {
248  close();
249  } catch (...) {
250  // Ignore any exceptions because destructor must not throw.
251  }
252  }
253 
262  void close() {
263  m_status = status::closed;
264 
265  m_read_thread_manager.stop();
266 
267  m_osmdata_queue_wrapper.drain();
268 
269  try {
270  m_read_thread_manager.close();
271  } catch (...) {
272  // Ignore any exceptions.
273  }
274 
275 #ifndef _WIN32
276  if (m_childpid) {
277  int status;
278  pid_t pid = ::waitpid(m_childpid, &status, 0);
279 #pragma GCC diagnostic push
280 #pragma GCC diagnostic ignored "-Wold-style-cast"
281  if (pid < 0 || !WIFEXITED(status) || WEXITSTATUS(status) != 0) {
282  throw std::system_error(errno, std::system_category(), "subprocess returned error");
283  }
284 #pragma GCC diagnostic pop
285  m_childpid = 0;
286  }
287 #endif
288  }
289 
297  if (m_status == status::error) {
298  throw io_error("Can not get header from reader when in status 'error'");
299  }
300 
301  try {
302  if (m_header_future.valid()) {
303  m_header = m_header_future.get();
304  if (m_read_which_entities == osmium::osm_entity_bits::nothing) {
305  m_status = status::eof;
306  }
307  }
308  } catch (...) {
309  close();
310  m_status = status::error;
311  throw;
312  }
313  return m_header;
314  }
315 
327  osmium::memory::Buffer buffer;
328 
329  if (m_status != status::okay ||
330  m_read_which_entities == osmium::osm_entity_bits::nothing) {
331  throw io_error("Can not read from reader when in status 'closed', 'eof', or 'error'");
332  }
333 
334  try {
335  // m_input_format.read() can return an invalid buffer to signal EOF,
336  // or a valid buffer with or without data. A valid buffer
337  // without data is not an error, it just means we have to
338  // keep getting the next buffer until there is one with data.
339  while (true) {
340  buffer = m_osmdata_queue_wrapper.pop();
341  if (detail::at_end_of_data(buffer)) {
342  m_status = status::eof;
343  m_read_thread_manager.close();
344  return buffer;
345  }
346  if (buffer.committed() > 0) {
347  return buffer;
348  }
349  }
350  } catch (...) {
351  close();
352  m_status = status::error;
353  throw;
354  }
355  }
356 
361  bool eof() const {
362  return m_status == status::eof || m_status == status::closed;
363  }
364 
365  }; // class Reader
366 
375  template <typename... TArgs>
378 
379  Reader reader(std::forward<TArgs>(args)...);
380  while (osmium::memory::Buffer read_buffer = reader.read()) {
381  buffer.add_buffer(read_buffer);
382  buffer.commit();
383  }
384 
385  return buffer;
386  }
387 
388  } // namespace io
389 
390 } // namespace osmium
391 
392 #endif // OSMIUM_IO_READER_HPP
detail::queue_wrapper< osmium::memory::Buffer > m_osmdata_queue_wrapper
Definition: reader.hpp:112
status
Definition: reader.hpp:96
osmium::memory::Buffer read()
Definition: reader.hpp:326
Reader(const osmium::io::File &file, osmium::osm_entity_bits::type read_which_entities=osmium::osm_entity_bits::all)
Definition: reader.hpp:212
type
Definition: entity_bits.hpp:63
int m_childpid
Definition: reader.hpp:103
std::future< osmium::io::Header > m_header_future
Definition: reader.hpp:114
std::string get(const std::string &key, const std::string &default_value="") const noexcept
Definition: options.hpp:124
osmium::memory::Buffer read_file(TArgs &&...args)
Definition: reader.hpp:376
std::unique_ptr< osmium::io::Decompressor > m_decompressor
Definition: reader.hpp:107
bool eof() const
Definition: reader.hpp:361
object or changeset
Definition: entity_bits.hpp:74
detail::future_string_queue_type m_input_queue
Definition: reader.hpp:105
osmium::io::File m_file
Definition: reader.hpp:93
Definition: file.hpp:74
Namespace for everything in the Osmium library.
Definition: assembler.hpp:66
Definition: attr.hpp:298
void add_buffer(const Buffer &buffer)
Definition: buffer.hpp:474
osmium::io::detail::ReadThreadManager m_read_thread_manager
Definition: reader.hpp:109
static int execute(const std::string &command, const std::string &filename, int *childpid)
Definition: reader.hpp:143
Definition: reader.hpp:91
~Reader() noexcept
Definition: reader.hpp:246
Definition: error.hpp:44
size_t committed() const noexcept
Definition: buffer.hpp:241
static void parser_thread(const osmium::io::File &file, detail::future_string_queue_type &input_queue, detail::future_buffer_queue_type &osmdata_queue, std::promise< osmium::io::Header > &&header_promise, osmium::osm_entity_bits::type read_which_entities)
Definition: reader.hpp:120
osmium::io::Header header()
Definition: reader.hpp:296
size_t get_max_queue_size(const char *queue_name, size_t default_value) noexcept
Definition: config.hpp:69
Definition: buffer.hpp:97
osmium::thread::thread_handler m_thread
Definition: reader.hpp:117
static int open_input_file_or_url(const std::string &filename, int *childpid)
Definition: reader.hpp:188
detail::future_buffer_queue_type m_osmdata_queue
Definition: reader.hpp:111
osmium::io::Header m_header
Definition: reader.hpp:115
Definition: entity_bits.hpp:65
Reader(const char *filename, osmium::osm_entity_bits::type read_types=osmium::osm_entity_bits::all)
Definition: reader.hpp:236
Definition: compression.hpp:117
Definition: header.hpp:49
void close()
Definition: reader.hpp:262
osmium::osm_entity_bits::type m_read_which_entities
Definition: reader.hpp:94
size_t commit()
Definition: buffer.hpp:335
Definition: util.hpp:83
Reader(const std::string &filename, osmium::osm_entity_bits::type read_types=osmium::osm_entity_bits::all)
Definition: reader.hpp:232