Partial results of long-running operations

Here are some news on the reproducibility project. After going for rather low-hanging fruits in January, in February I started work on a bigger task, that of supporting visualization of partial results of long-running operations. This work is available in the 5541-partial-change-support branch of my fork.

I made a quick video to demonstrate the intended use case:

.

For now this really is a proof of concept of the feature, a validation that it can be done relatively naturally with the new architecture. I am hoping this will also prompt us to think about design questions around this feature, as mentioned in the video. The ones I am thinking about for now are:

  • how to convey the right expectations around sequencing of updates to the project data (the fact that editing cells before the reconciliation operation reaches their row will not influence the reconciliation value)
  • how to change the way processes are displayed in the UI, making space for multiple processes and more options to pause, resume, cancel and rerun them
  • (not mentioned in the video) how to update the data shown in the grid in a non-disruptive way (for now, the grid is only updated when changing pagination settings, but it’s really just a hack)
  • how to update the representation of the history, to associate running processes and history entries.

Another exciting consequence of this work (also not mentioned in the video) is in the Wikibase extension. The internal architectural changes for this feature forced me to change the Wikibase upload operation quite a bit. So far, this operation gathered all edits generated by the entire project first, then optimized them so that edits to a given entity are grouped together (resulting in at most two edits per entity, generally a single one), and then carried out those edits. I originally made this choice to minimize the number of edits made (since Wikibase edits are costly), but this conflicts with this new architecture because it requires to read the entire dataset before doing any edit. I was therefore sort of forced to change the Wikibase upload operation so that it does its edit grouping at a smaller scale, using a configurable batch size. This means that we will be doing a bit more edits (since edits generated by rows that are far apart will not be grouped together), but it has the very big benefit of making it much easier to keep track of which row generated which edit. This opens the door for in-grid error reporting for the Wikibase upload operation. We could change the operation so that it stores any editing errors it encounters in a dedicated column, making it much easier for users to figure out which parts of their datasets were actually uploaded and which errors prevented the rest of the dataset to upload. I think the lack of error-reporting in this operation is really bad and is a major obstacle to its broader adoption, especially for Wikimedia Commons where upload errors are very common. I think the ability to pause and resume processes should also be useful in that context.

With this prototype out of the door I am thinking about starting to look for a designer to help with the questions raised above (and other design questions in this project). We have some budget planned for such a role in the grant for the reproducibility project. But obviously, the community should have its say in where this is going, and so I would be curious to hear what people think of this at this stage already. Do you recognize the need for this? What workflows would you use it for? I am all ears :slight_smile:

1 Like

Thank you for taking the time to write your thoughts down and produce the video.

Actually I would prefer to have the processes running so fast that we would not have to deal with this issues at all… but reality tells a different story :grin:

For what it is worth I think your thoughts (and implementation) are going into the right direction.

1 Like

Here is another monthly update on this front.

This month I worked on various tasks. The most salient aspect from a user perspective is the ability to pause and resume long-running operations. When you reconcile, fetch data from URLs, fetch data from reconciled values or upload data to Wikibase, you now have a pause button. What is perhaps more interesting is that if OpenRefine shuts down in the middle of a long-running process, it will be able to pick it back up where it left it after restart. This is designed to work even if OpenRefine is shut down in not so clean ways, to cover cases such as power outages. In those cases, you might lose a few rows worth of work in the operation results, but you should generally be able to recover most of the results as long as your HDD/SSD coped with the interruption nicely.

I made again a video about that:

Here are some details about the more technical aspects of the work, on which your feedback would be welcome:

Implementation of pausing

While the idea of pausing a process is a rather simple one, implementing this was quite some work. There is no simple way to tell an arbitrary Java thread to pause its work: the code running in that thread must have been designed to check by itself if it should pause, and do so if some condition is met. That’s actually useful because it helps make sure that the thread is pausing in a meaningful state (for instance, not when it’s in the middle of talking to some web server). The challenge was to implement this pausing in a way which lets extensions define their own fetching processes which should also benefit from this pause feature and is compatible with the Runner / Grid / ChangeData interfaces, meaning that the pausing functionality can work with different runner implementations.

I went for the approach of adding some asynchronous versions of the methods offered by the Grid and ChangeData interfaces, similarly to what Spark does.

An async method is a method which returns immediately, instead of blocking until its result is available. The corresponding process is done in the background (meaning, in another thread) and the return value of the method is a Future object, which is essentially a handle on the eventual result. Futures are a standard concept in many programming languages and have made their way in the JDK as well, as java.util.concurrent.Future. Beyond accessing the eventual return value of the process, they also provide some limited capability to intervene on that process, such as cancelling it.

On its own, this interface is a bit limiting: there is no way to pause a future, for instance. So I have followed the example of the Guava library, which provides an extended Future interface with the ability to register a callback. I have done the same and made a ProgressingFuture, which extends Guava’s ListeningFuture and adds support for two things:

  • pausing/resuming
  • reporting progress of the task, as an integer between 0 and 100.

I have used this future interface to offer async methods in the ChangeData and Grid interfaces, such as ChangeData.saveToFileAsync. This has the nice effect of simplifying the interface quite a bit: before, we had to pass a ProgressReporter object as argument, which was a bit convoluted.

Following that change in the interfaces, I had to propagate that to all implementations, their common test suite, and the eventual users of those new async methods. Apache Spark does not support pausing jobs, so our Spark-based runner will simply ignore the pause button. The testing runner uses eager single-threaded evaluation so it will also ignore this feature. Most of the work lies therefore in the local runner (the default one).

In the local runner, the process of saving change data on disk (which triggers its computation) will attempt to pause after each row in the project, via the TaskSignalling.yield() method. And that happens regardless of how the change data is fetched itself.

It feels like a sensible choice to me, although it does mean that when implementing a long-running process (generally by providing a RowChangeDataProvider), one cannot provide other pausing points inside the computation itself. I don’t think it should be too limiting though.

Implementation of change data recovery after restart / crash

On its own, this was a fairly small change, but mostly because the whole architecture was already designed with that use case in mind.

The general scenario is as follows (I might migrate this part of the post to the technical documentation if this confirms to be the approach we want to go for). It might look a bit convoluted but I think it should be rather reliable.

Initial fetching of change data

While the original process (such as reconciliation) is running, it is writing its results as it gets them into a dedicated directory of the project storage. The process will generally run on multiple threads which each write their results in their own file (corresponding to a partition of the project data). The directory structure looks like this (seen from the project directory):

./changes/1680105382616/recon
./changes/1680105382616/recon/part-00000.gz
./changes/1680105382616/recon/part-00001.gz
./changes/1680105382616/recon/part-00002.gz
./changes/1680105382616/recon/part-00003.gz

Those files are gzip’ed on the fly as they are written. Their contents are newline-delimited pairs of a row id and the corresponding value fetched (such as reconciled cell).
The file contents are flushed to disk after each line. This is something we could likely want to make configurable, because it might be too aggressive if rows are computed very quickly. Also, it has a non-negligible impact on the size of the compressed files, because forcing the compressor to flush its output often means that it cannot optimize as much as with a larger buffer.

The process is interrupted

The interruption can be quite arbitrary: the writing of the partition files can stop at any step, meaning that it will likely not be a valid Gzip file. This is not a problem because by decompressing those files in a streaming fashion we are still able to recover everything but the last few lines.

OpenRefine starts again

When the project is loaded again, OpenRefine attempts to reconstruct the grid in the state it was left in. Because this grid depends on the change data object, it attempts to load that object. It checks whether there is a _SUCCESS file at ./changes/1680105382616/recon/_SUCCESS, but there is not. The presence of this empty file would have indicated that all partitions of the change data had been fetched successfully already.

The incomplete change data is moved to a dedicated directory

Because the change data is incomplete, we first move it to a dedicated directory in the project storage. It now lives in incomplete_changes instead of changes:

./incomplete_changes/1680105382616/recon
./incomplete_changes/1680105382616/recon/part-00000.gz
./incomplete_changes/1680105382616/recon/part-00001.gz
./incomplete_changes/1680105382616/recon/part-00002.gz
./incomplete_changes/1680105382616/recon/part-00003.gz

The fetching process restarts

We then restart a fetching process. Instead of simply reading the grid and producing its change data (as it would if it were starting from scratch), it reads the join of the grid and of the incomplete change data. For all rows where there is already a result fetched in the incomplete change data, it does not run its own fetching process and simply reuses that data instead.

This means that producing the first lines of the change data is very quick: it only consists in copying the previous incomplete results.

That new change data is stored again in ./changes/1680105382616/recon/. This is combined with the rest of the change data objects and grid to produce the current grid shown to the user.

The incomplete change data is deleted

Once the change data is fetched completely, the _SUCCESS file is created and the incomplete change data is deleted.

General polishing and speed optimizations

I also spent quite some time just “dog-fooding” the prototype myself, that is to say trying to use it for some data cleaning projects. Because since the implementation of partial results of long-running operations I have disabled any in-memory caching of project data, everything is done off disk and it was unreasonably slow.

One way to solve the problem would of course be to re-introduce caching, but this is actually a good opportunity to optimize a lot of places (including ones that will remain beneficial even with caching on).

I have sped up the way we parse lines in grid and change data files. It used to rely on a self-made reader (LineReader), a replacement for the standard LineNumberReader which has a tighter control of how many bytes are read from the underlying InputStream. This is necessary in some cases for partitioning purposes (being able to have multiple threads start reading the file at different offsets), but my version is definitely not efficient enough and that caused noticeable lag in the UI.

For now, I have switched back to LineNumberReader in case we are reading a compressed file (which is the case for all files internal to project serialization), but it would still be worth investigating if LineReader can be made more efficient, because it would in turn speed up some importers like the CSV/TSV or line-based parsers.

This optimization effort was an occasion to try out the JMH benchmark framework that @tfmorris introduced a while back. It was helpful to evaluate the loading time of a grid in various circumstances, comparing it between runners for instance.

What’s next?

Following my work plan, this task of pausing and resuming long-running operations was the last one I could tackle without introducing column dependency metadata in the model. Although I am excited to get to that, I think I will first try to stabilize and optimize the current prototype more, ideally to get to a state where I feel comfortable doing more external user testing.

The next optimization I want to work on is related to the counting of records: at the moment, a lot of the slowdowns of the UI are caused by OpenRefine re-counting the number of records after some change, even if we are not in records mode (in fact, that number of records is not displayed anywhere in the UI). This is related to issue #5661 and is a good opportunity to improve the UX around the records mode.

1 Like

Oh this is very interesting! I wonder if this could be used to support gracefully shutting down an OpenRefine instance? Today it seems to be a matter of making sure OpenRefine is not up to anything, but I guess the above could be used for an actual command which one could, for example call from systemd?

Thanks! What do you mean by “actual command”? For now the way to shut down OpenRefine is to send it a signal such as SIGINT (Crtl-C in the terminal), so the goal of this work is indeed to ensure it does not incur data loss (and as little data loss as possible with harsher signals or other forms of termination) even if some long-running operations are running.

I am not sure how that ties in with systemd - I guess one could write a wrapper which would start and stop OpenRefine as a service, and stopping would likely be done by sending SIGINT indeed. Or are you suggesting that OpenRefine itself should offer a different shutting down mechanism? What would it look like?

I could imagine a refine stop command which would be able to send SIGTERM to non-foreground processes. It’s very much a “nice to have” as I expect it would do something very similar to what systemd does by default(SIGTERM falling back to SIGKILL) but with the benefit of being entirely independent from the service runner.

I guess it could be useful to desktop users as well as I have noticed that OpenRefine under certain circumstances can end up running in the background.

The prototype described above had one big usability flaw: you had to manually trigger a refresh of the grid (for instance by changing the number of rows shown) for it to update. There was also no indication in the grid that the view shown was only an intermediate state of computation and that more data was coming.

As I wrote earlier there are many ways we could change that. I have been exploring one, which consists in showing which cells are still being computed. You can try it out on the 4.0 branch and it looks like this:

reconciliation1

The basic principle is simple: when joining a change data object with the grid to obtain the state of the grid after a long-running operation, cells which depend on missing parts of the change data (i.e. rows where the operation has not been computed yet) are marked as “pending” and serialized as such.
As long as there are pending cells in the current view, the frontend periodically refreshes the grid.

This solves the basic problem of knowing when to refresh the grid, but the refreshing itself could still be improved:

  • Potentially, we could only refresh the cells that were pending, not the entire grid. This would give a better user experience by being more efficient and giving a more fluid experience of (for instance) editing a cell in a column that is not being computed. The fact that we do not use a reactive frontend makes this a bit cumbersome, but it is still doable.
  • Refreshing the grid can change the width of columns, which can also be disruptive if the user is interacting with other cells while the grid is refreshed. This is why I started looking into making the grid layout more predictable.

Many UX options are still on the table:

  • Do not add a spinner to pending cells (like in the previous prototype) but just use this information internally to determine if the grid should be refreshed;
  • Make the spinner not fully opaque to show the previous value of the cell before the operation:

    Or more generally any other ways to render those pending cells.
  • Do not refresh automatically, but instead create a notification at the top of the screen (for instance using our yellow bubbles) signifying that the operation has progressed and the grid can be refreshed. The user would then click on “Refresh” in the notification to trigger the refresh when it fits them. Or give the user the option to switch between this manual refreshing and auto-refresh.
  • Mark entire rows/records as being pending if any cell in them is. Or hide them entirely: only show rows/records that are complete. This would mean that the grid would initially look empty after starting a long-running operation, and it would fill itself up gradually.
  • Other approaches? What would be your preferred one?

Although the current prototype works okay from what I can tell with interactive testing, I am still struggling to get the corresponding Cypress tests to pass reliably. In my experience that’s a sign that the strategy used is not fully bullet-proof and there can be some races between various processes, so I am investigating that.

But that should not prevent you from taking it for a spin already! :slight_smile: