Concurrency of long-running operations

I have worked on the other type of concurrency mentioned in my first post above: row-wise concurrency.

Here is a reminder of the use case. Say you have a column to reconcile and then want to fetch more columns from the reconciled values. If the reconciliation process is easy enough (with the service reliably auto-matching cells), then you could just want to execute the data extension process directly after reconciliation. OpenRefine should not have to wait until the last cell is reconciled before fetching the new columns on the first few rows.

The crucial difficulty here is that the data extension process should not try to fetch the new columns for rows which are not reconciled yet. It needs to wait for the reconciliation process to output reconciled rows. One can think of it as a pipeline, where each operation is a row-processing device, with pipes connecting an operation to the next one.

Problem statement

First, how do long-running operations currently work in the new architecture? They work by defining a "change data producer", which, for each row or record, produces some data of a given type. This data is written to a dedicated file, with on each line a pair of the row/record id and the fetched data (serialized in JSON). The file is compressed with Gzip and split into multiple partitions. Each partition file is written to disk gradually, as the data is fetched. The new state of the project's grid is obtained by doing a join (in the terminology of relational databases) between the original project grid and the change data, joining the original rows with the fetched data to produce the new rows. Because both the original grid and the change data are sorted by row/record ids and have the same partition boundaries, this join can be computed efficiently using a joint scan of both operands. When executing this join while the change data is still being fetched, missing rows are replaced by placeholder values, which make it possible to display the grid before completion of the fetching process.

Consider the second long-running process (such as the data extension fetching process in this running example). It needs to read the grid it is executed on, but should not be able to read the rows with placeholder values. Instead, iterating on this grid should be done in a blocking way: the process should wait for the next row to be computed before being able to fetch it.

In UNIX terms, this behaviour is quite similar to piping processes together, either with the | symbol in bash or by using named pipes with mkfifo. However, in our case we also need to write the change data to disk (to be able to save the project, to display intermediate states with the Undo/Redo functionality, to be able to recover after a crash, and so on). So we need the second process to read the contents of the change data file that the first process writes to, similar to what the tail -f command does. So I have attempted to implement something that follows the same spirit.

Changes required on the project serialization format

The tail -f command works by listening to changes made to a file using the inotify API. Every time a write is made to this file, the operating system notifies the tail process about it, and it reads the file from where it stopped. Luckily there is a platform-idependent interface to use this in Java, WatchService. So I have attempted to make a prototype based on that.

Compression

One problem is that our change data files are compressed in Gzip, and this compression format does not support resuming reading from an arbitrary point in the file: you always need to read from the start of the file. If the decompressor encounters an unexpected end of file (which we expect to happen in a context where the compressed file is being written to by another process), then there is no simple way to save the state of the decompressor and try reading again when more content is available.

Therefore I had to change the compression format to a compression codec which supports this better. I went for the ZSTD format because it is reasonably established, seems to have good performance and there is an actively maintained library, jstd-jni, which implements this codec with streaming support. The library is used by major projects such as Apache Spark and the like, so I think it's unlikely to go bust (and if it does we should be able to imitate those other major dependencies). I would of course be happy to consider other compression formats.

End marker

When a process reads a change data file and reaches the end of the file, it needs to know whether that's the actual end of the partition or if more data might be written by the upstream operation later on. If the change data file ends abruptly in the middle of a line or in a compression block then it's clear that more data is coming, but it can also be that the file ends cleanly with a complete line, in which case there is no way to tell.

So to cater for this need I introduced an end marker, for now simply "end", added at the end of each partition file. It's unambiguous because all other lines start with the row/record number.

This marker is only there to signify the completion of a partition file: the completion of the entire change data is signified by adding an empty _SUCCESS file in the same directory as the partitions, just like Spark does.

Touching all partition files when starting to write a change data

So far, when writing the data fetched by an operation to disk, we would only start writing a partition file when it starts to get processed. This is a problem for synchronous fetching, because the observable number of partitions in the change data will change over time. The collections we use, PLLs, come with the assumption that their partitions are immutable. To keep this property (which is generally useful), I have decided to make sure that all partition files are touched (meaning, an empty file of the expected filename is created) before any actual writing happens. I think the performance cost associated with this change should only become noticeable with hundreds or thousands of partitions, meaning many millions of rows, so I think that's acceptable.

Changes required in the tool's architecture

Choosing between asynchronous and synchronous iteration

When iterating from a grid, we need to be able to switch between two behaviours:

  • the existing behaviour, which iterates over the entire grid and replaces missing values not computed yet by placeholders (used to display the grid in the web interface, or compute facets for instance). This should not require setting up any WatchService or have any additional overhead compared to what we are doing now.
  • the new behaviour, with a blocking iteration which is waiting for missing values to be computed, using the WatchService

I implemented this by providing an IterationContext to the iterate/compute methods of the PLL interface (see the corresponding change).

Better record grouping

Given a project grid, accessing its records implies deriving a PLL<Record> out of the PLL<IndexedRow> of the rows in the grid. Currently, whenever this record PLL is computed, we run a small task which inspects the records that are on partition boundaries. This has the benefit of making it possible to then iterate from a partition of records simply by iterating on the underlying partition of rows, grouping the rows into records on the fly and not having to look at the next partition for the end of last record.

But if the underlying collection of rows is not fully computed yet, then this does not work: the records at the partition boundaries might evolve after the grid has been fetched entirely. So we will need to make this record grouping lazier: do not run the task when creating the collection, but instead let a process iterating over a given partition of records to also inspect the next partition of rows to finish the last record in the partition.

I have not implemented this approach yet but I think this should overall be an improvement, even in other use cases, are we are sparing ourselves a task which should make accessing records faster overall.

As always, I would welcome feedback on the approach. I do realize that this goes rather deep in the inner workings of the new architecture that basically no-one else than me is familiar with, but perhaps people have thoughts about the high-level approach and about some implementation details such as the reliance on the WatchService and Zstd-JNI.

Overall, although the architectural changes are quite involved, I think this is a change that is worth pursuing because:

  • it will have a big impact on the user experience, significantly reducing the frequency of cases where the user needs to wait for the completion of a long-running operation before carrying on with other operations
  • because the project serialization format is still easy to change as we haven't released this architecture, now is a good time to investigate what is needed to support this.

I made a draft PR which contains the changes I made towards this so far: Support for synchronous iteration from partially computed grids by wetneb · Pull Request #343 · wetneb/OpenRefine · GitHub