Partial results of long-running operations

Here is another monthly update on this front.

This month I worked on various tasks. The most salient aspect from a user perspective is the ability to pause and resume long-running operations. When you reconcile, fetch data from URLs, fetch data from reconciled values or upload data to Wikibase, you now have a pause button. What is perhaps more interesting is that if OpenRefine shuts down in the middle of a long-running process, it will be able to pick it back up where it left it after restart. This is designed to work even if OpenRefine is shut down in not so clean ways, to cover cases such as power outages. In those cases, you might lose a few rows worth of work in the operation results, but you should generally be able to recover most of the results as long as your HDD/SSD coped with the interruption nicely.

I made again a video about that:

Here are some details about the more technical aspects of the work, on which your feedback would be welcome:

Implementation of pausing

While the idea of pausing a process is a rather simple one, implementing this was quite some work. There is no simple way to tell an arbitrary Java thread to pause its work: the code running in that thread must have been designed to check by itself if it should pause, and do so if some condition is met. That’s actually useful because it helps make sure that the thread is pausing in a meaningful state (for instance, not when it’s in the middle of talking to some web server). The challenge was to implement this pausing in a way which lets extensions define their own fetching processes which should also benefit from this pause feature and is compatible with the Runner / Grid / ChangeData interfaces, meaning that the pausing functionality can work with different runner implementations.

I went for the approach of adding some asynchronous versions of the methods offered by the Grid and ChangeData interfaces, similarly to what Spark does.

An async method is a method which returns immediately, instead of blocking until its result is available. The corresponding process is done in the background (meaning, in another thread) and the return value of the method is a Future object, which is essentially a handle on the eventual result. Futures are a standard concept in many programming languages and have made their way in the JDK as well, as java.util.concurrent.Future. Beyond accessing the eventual return value of the process, they also provide some limited capability to intervene on that process, such as cancelling it.

On its own, this interface is a bit limiting: there is no way to pause a future, for instance. So I have followed the example of the Guava library, which provides an extended Future interface with the ability to register a callback. I have done the same and made a ProgressingFuture, which extends Guava’s ListeningFuture and adds support for two things:

  • pausing/resuming
  • reporting progress of the task, as an integer between 0 and 100.

I have used this future interface to offer async methods in the ChangeData and Grid interfaces, such as ChangeData.saveToFileAsync. This has the nice effect of simplifying the interface quite a bit: before, we had to pass a ProgressReporter object as argument, which was a bit convoluted.

Following that change in the interfaces, I had to propagate that to all implementations, their common test suite, and the eventual users of those new async methods. Apache Spark does not support pausing jobs, so our Spark-based runner will simply ignore the pause button. The testing runner uses eager single-threaded evaluation so it will also ignore this feature. Most of the work lies therefore in the local runner (the default one).

In the local runner, the process of saving change data on disk (which triggers its computation) will attempt to pause after each row in the project, via the TaskSignalling.yield() method. And that happens regardless of how the change data is fetched itself.

It feels like a sensible choice to me, although it does mean that when implementing a long-running process (generally by providing a RowChangeDataProvider), one cannot provide other pausing points inside the computation itself. I don’t think it should be too limiting though.

Implementation of change data recovery after restart / crash

On its own, this was a fairly small change, but mostly because the whole architecture was already designed with that use case in mind.

The general scenario is as follows (I might migrate this part of the post to the technical documentation if this confirms to be the approach we want to go for). It might look a bit convoluted but I think it should be rather reliable.

Initial fetching of change data

While the original process (such as reconciliation) is running, it is writing its results as it gets them into a dedicated directory of the project storage. The process will generally run on multiple threads which each write their results in their own file (corresponding to a partition of the project data). The directory structure looks like this (seen from the project directory):

./changes/1680105382616/recon
./changes/1680105382616/recon/part-00000.gz
./changes/1680105382616/recon/part-00001.gz
./changes/1680105382616/recon/part-00002.gz
./changes/1680105382616/recon/part-00003.gz

Those files are gzip’ed on the fly as they are written. Their contents are newline-delimited pairs of a row id and the corresponding value fetched (such as reconciled cell).
The file contents are flushed to disk after each line. This is something we could likely want to make configurable, because it might be too aggressive if rows are computed very quickly. Also, it has a non-negligible impact on the size of the compressed files, because forcing the compressor to flush its output often means that it cannot optimize as much as with a larger buffer.

The process is interrupted

The interruption can be quite arbitrary: the writing of the partition files can stop at any step, meaning that it will likely not be a valid Gzip file. This is not a problem because by decompressing those files in a streaming fashion we are still able to recover everything but the last few lines.

OpenRefine starts again

When the project is loaded again, OpenRefine attempts to reconstruct the grid in the state it was left in. Because this grid depends on the change data object, it attempts to load that object. It checks whether there is a _SUCCESS file at ./changes/1680105382616/recon/_SUCCESS, but there is not. The presence of this empty file would have indicated that all partitions of the change data had been fetched successfully already.

The incomplete change data is moved to a dedicated directory

Because the change data is incomplete, we first move it to a dedicated directory in the project storage. It now lives in incomplete_changes instead of changes:

./incomplete_changes/1680105382616/recon
./incomplete_changes/1680105382616/recon/part-00000.gz
./incomplete_changes/1680105382616/recon/part-00001.gz
./incomplete_changes/1680105382616/recon/part-00002.gz
./incomplete_changes/1680105382616/recon/part-00003.gz

The fetching process restarts

We then restart a fetching process. Instead of simply reading the grid and producing its change data (as it would if it were starting from scratch), it reads the join of the grid and of the incomplete change data. For all rows where there is already a result fetched in the incomplete change data, it does not run its own fetching process and simply reuses that data instead.

This means that producing the first lines of the change data is very quick: it only consists in copying the previous incomplete results.

That new change data is stored again in ./changes/1680105382616/recon/. This is combined with the rest of the change data objects and grid to produce the current grid shown to the user.

The incomplete change data is deleted

Once the change data is fetched completely, the _SUCCESS file is created and the incomplete change data is deleted.

General polishing and speed optimizations

I also spent quite some time just “dog-fooding” the prototype myself, that is to say trying to use it for some data cleaning projects. Because since the implementation of partial results of long-running operations I have disabled any in-memory caching of project data, everything is done off disk and it was unreasonably slow.

One way to solve the problem would of course be to re-introduce caching, but this is actually a good opportunity to optimize a lot of places (including ones that will remain beneficial even with caching on).

I have sped up the way we parse lines in grid and change data files. It used to rely on a self-made reader (LineReader), a replacement for the standard LineNumberReader which has a tighter control of how many bytes are read from the underlying InputStream. This is necessary in some cases for partitioning purposes (being able to have multiple threads start reading the file at different offsets), but my version is definitely not efficient enough and that caused noticeable lag in the UI.

For now, I have switched back to LineNumberReader in case we are reading a compressed file (which is the case for all files internal to project serialization), but it would still be worth investigating if LineReader can be made more efficient, because it would in turn speed up some importers like the CSV/TSV or line-based parsers.

This optimization effort was an occasion to try out the JMH benchmark framework that @tfmorris introduced a while back. It was helpful to evaluate the loading time of a grid in various circumstances, comparing it between runners for instance.

What’s next?

Following my work plan, this task of pausing and resuming long-running operations was the last one I could tackle without introducing column dependency metadata in the model. Although I am excited to get to that, I think I will first try to stabilize and optimize the current prototype more, ideally to get to a state where I feel comfortable doing more external user testing.

The next optimization I want to work on is related to the counting of records: at the moment, a lot of the slowdowns of the UI are caused by OpenRefine re-counting the number of records after some change, even if we are not in records mode (in fact, that number of records is not displayed anywhere in the UI). This is related to issue #5661 and is a good opportunity to improve the UX around the records mode.

1 Like