Skip to content

API

cli

cli(context)

Create and manage Google Cloud Workstation.

Source code in src/workstation/cli/__init__.py
21
22
23
24
25
@group_wrapper(name="workstation")
@click.version_option(package_name="cloud-workstation")
@click.pass_context
def cli(context: click.Context):
    """Create and manage Google Cloud Workstation."""

crud

crud module provides command-line interface (CLI) commands for managing workstations.

Include functionalities to create, delete, list, start, and stop workstations, as well as manage workstation configurations.

Functions:

Name Description
get_gcloud_config

Retrieve GCP configuration details including project, location, and account.

common_options

Apply common CLI options to commands.

create

Create a workstation.

list_configs

List workstation configurations.

list

List workstations.

start

Start workstation and optionally open it either locally with VSCode or through VSCode in a browser.

stop

Stop workstation.

delete

Delete workstation.

sync

Sync files to workstation.

logs

Open logs for the workstation.

common_options(func)

Apply common CLI options to commands.

Parameters:

Name Type Description Default
func callable

The function to apply the options to.

required

Returns:

Type Description
callable

The function with common options applied.

Source code in src/workstation/cli/crud.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def common_options(func):  # noqa: D103
    """
    Apply common CLI options to commands.

    Parameters
    ----------
    func : callable
        The function to apply the options to.

    Returns
    -------
    callable
        The function with common options applied.
    """
    for option in reversed(_common_options):
        func = option(func)
    return func

create(context, cluster, config, location, name, project, proxy, no_proxy, envs, **kwargs)

Create a workstation.

Source code in src/workstation/cli/crud.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
@command()
@common_options
@click.option(
    "--name",
    help="Name of the workstation to create.",
    type=str,
    metavar="<str>",
)
@click.option(
    "--proxy",
    help="proxy setting.",
    type=str,
    metavar="<str>",
)
@click.option(
    "--no-proxy",
    help="No proxy setting.",
    type=str,
    metavar="<str>",
)
@click.option(
    "--env",
    "-e",
    "envs",
    type=(str, str),
    multiple=True,
    help="Environment variables to set at runtime.",
    metavar="<key value>",
)
@click.pass_context
def create(
    context: click.Context,
    cluster: str,
    config: str,
    location: Optional[str],
    name: str,
    project: Optional[str],
    proxy: Optional[str],
    no_proxy: Optional[str],
    envs: Tuple[Tuple[str, str]],
    **kwargs,
):
    """Create a workstation."""
    # Make sure the user is authenticated
    check_gcloud_auth()

    project, location, account = get_gcloud_config(project=project, location=location)

    # Ensure USER is set on laptop
    user = getpass.getuser()

    try:
        from block.mlds.proxy.block import Proxy

        proxies = Proxy(project=project, name=name)
        proxy = proxies.proxy
        no_proxy = proxies.no_proxy
    except ImportError:
        pass

    if config_manager.check_if_config_exists(name):
        console.print(f"Workstation config for {name} already exists.")
        overwrite = Confirm.ask("Overwrite config?")
        if not overwrite:
            console.print(f"Exiting without creating workstation {name}.")
            sys.exit(0)

    _ = create_workstation(
        cluster=cluster,
        config=config,
        name=name,
        user=user,
        account=account,
        project=project,
        location=location,
        proxy=proxy,
        no_proxy=no_proxy,
        envs=envs,
    )

    config_manager.write_ssh_config(
        name=name,
        user=user,
        cluster=cluster,
        region=location,
        project=project,
        config=config,
    )

    console.print(f"Workstation {name} created.")

delete(context, **kwargs)

Delete workstation.

Source code in src/workstation/cli/crud.py
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
@command()
@click.option(
    "-n",
    "--name",
    help="Name of the workstation to delete.",
    type=str,
    metavar="<str>",
)
@click.pass_context
def delete(context: click.Context, **kwargs):
    """Delete workstation."""
    # Make sure the user is authenticated
    check_gcloud_auth()

    workstation_details = config_manager.read_configuration(kwargs["name"])

    response = delete_workstation(**workstation_details)
    config_manager.delete_configuration(kwargs["name"])
    if response.state.value == 0:
        console.print(f"Workstation {kwargs['name']} deleted.")

get_gcloud_config(project, location)

Retrieve GCP configuration details including project, location, and account.

Parameters:

Name Type Description Default
project str

GCP project name.

required
location str

GCP location.

required

Returns:

Type Description
tuple

A tuple containing project, location, and account details.

Source code in src/workstation/cli/crud.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def get_gcloud_config(project: Optional[str], location: Optional[str]):  # noqa: D103
    """
    Retrieve GCP configuration details including project, location, and account.

    Parameters
    ----------
    project : str, optional
        GCP project name.
    location : str, optional
        GCP location.

    Returns
    -------
    tuple
        A tuple containing project, location, and account details.
    """
    config_project, config_location, account = read_gcloud_config()

    if project is None:
        if config_project is not None:
            project = config_project
        else:
            raise ValueError(
                "Project not found in gcloud config and was not passed in."
            )

    if location is None:
        if config_location is not None:
            location = config_location
        else:
            raise ValueError(
                "Location not found in gcloud config and was not passed in."
            )

    if account is None:
        raise ValueError("Account not found in gcloud config.")

    return project, location, account

list(context, project, location, all, user, export_json, cluster, **kwargs)

List workstations.

Source code in src/workstation/cli/crud.py
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
@command()
@common_options
@click.option(
    "--json",
    "export_json",
    default=False,
    is_flag=True,
    help="print json output",
)
@click.option(
    "-u",
    "--user",
    default=getpass.getuser(),
    help="Lists workstations only from a given user.",
)
@click.option(
    "-a", "--all", is_flag=True, default=False, help="List workstations from all users."
)
@click.pass_context
def list(
    context: click.Context,
    project: Optional[str],
    location: Optional[str],
    all: bool,
    user: str,
    export_json: bool,
    cluster: str,
    **kwargs,
):
    """List workstations."""
    # Make sure the user is authenticated
    check_gcloud_auth()

    project, location, account = get_gcloud_config(project=project, location=location)

    workstations = list_workstations(
        cluster=cluster,
        project=project,
        location=location,
    )

    results = []
    for workstation in workstations:
        if not all and workstation.get("env", {}).get("LDAP") != user:
            continue

        result = {
            "name": workstation["name"].split("/")[-1],
            "user": workstation["env"]["LDAP"],
            "project": workstation["project"],
            "location": workstation["location"],
            "config": workstation["config"]["name"].split("/")[-1],
            "cluster": workstation["cluster"],
            "state": workstation["state"].name,
            "idle_timeout": workstation["config"]["idle_timeout"],
            "max_runtime": workstation["config"]["max_runtime"],
            "type": workstation["config"]["machine_type"],
            "image": workstation["config"]["image"],
        }
        results.append(result)

    if export_json:
        json_data = json.dumps(results, indent=4)
        console.print(json_data)
    else:
        tree = Tree("Workstations", style="bold blue")

        for result in results:
            if result["state"] == "STATE_RUNNING":
                status = ":play_button: Running"
            elif result["state"] == "STATE_STOPPED":
                status = ":stop_sign: Stopped"
            elif result["state"] == "STATE_STARTING":
                status = ":hourglass: Starting"
            elif result["state"] == "STATE_STOPPING":
                status = ":hourglass: Stopping"
            else:
                status = ":question: State unknown"

            config_branch = tree.add(f"Workstation: {result['name']}")
            config_branch.add(f"{status}", style="white")
            config_branch.add(f"User: {result['user']}", style="white")
            config_branch.add(f":minidisc: Image: {result['image']}")
            config_branch.add(f":computer: Machine Type: {result['type']}")
            config_branch.add(
                f":hourglass_flowing_sand: Idle Timeout (s): {str(result['idle_timeout'])}"
            )
            config_branch.add(
                f":hourglass_flowing_sand: Max Runtime (s): {str(result['max_runtime'])}"
            )

        console.print(tree)
        console.print("Total Workstations: ", len(tree.children))

list_configs(context, project, location, cluster, **kwargs)

List workstation configurations.

Source code in src/workstation/cli/crud.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
@command()
@common_options
@click.pass_context
def list_configs(
    context: click.Context,
    project: Optional[str],
    location: Optional[str],
    cluster: str,
    **kwargs,
):
    """List workstation configurations."""
    # Make sure the user is authenticated
    check_gcloud_auth()

    project, location, account = get_gcloud_config(project=project, location=location)
    configs = list_workstation_configs(
        cluster=cluster,
        project=project,
        location=location,
    )

    console.print(config_tree(configs))

logs(name, project, **kwargs)

Open logs for the workstation.

Source code in src/workstation/cli/crud.py
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
@command()
@click.argument(
    "name",
    type=str,
)
@click.option(
    "--project",
    help="Name of the workstation GCP project.",
    type=str,
    metavar="<str>",
)
def logs(name: str, project: str, **kwargs):
    """Open logs for the workstation."""
    check_gcloud_auth()
    instances = get_instance_assignment(project=project, name=name)
    instance = instances.get(name, None)
    if instances is None:
        console.print(f"Workstation {name} not found.")
        return
    console.print(f"Logs for instance: {instance.get('instance_name')} opening")
    webbrowser.open(instance.get("logs_url"))

start(context, name, code, browser, **kwargs)

Start workstation and optionally open it either locally with VSCode or through VSCode in a browser.

Source code in src/workstation/cli/crud.py
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
@command()
@click.option(
    "-n",
    "--name",
    help="Name of the workstation to start.",
    type=str,
    metavar="<str>",
    required=True,
)
@click.option(
    "--code",
    help="Open workstation in VSCode locally. "
    "This requires setup illustrated in "
    "https://workstation.mlds.cash/#connect-to-a-workstation-with-local-vs-code",
    is_flag=True,
    default=False,
)
@click.option(
    "--browser",
    help="Open workstation with a remote VSCode session in a web browser.",
    is_flag=True,
    default=False,
)
@click.pass_context
def start(context: click.Context, name: str, code: bool, browser: bool, **kwargs):
    """Start workstation and optionally open it either locally with VSCode or through VSCode in a browser."""
    # Make sure the user is authenticated
    check_gcloud_auth()

    if code and browser:
        raise ValueError(
            "Select either local VSCode (--code) or remote VSCode in web browser (--browser)."
        )

    workstation_details = config_manager.read_configuration(name)

    response = start_workstation(**workstation_details)
    url = f"https://80-{response.host}"
    if not code and not browser:
        console.print(
            "Use --browser or --code to open the workstation in browser or vs code directly."
        )
        console.print(url)
    elif code:
        url = f"vscode://vscode-remote/ssh-remote+{name}/home/{getpass.getuser()}"
        console.print("Opening workstation in VSCode...")
        webbrowser.open(url)
    elif browser:
        console.print(f"Opening workstation at {url}...")
        webbrowser.open(url)

stop(context, **kwargs)

Stop workstation.

Source code in src/workstation/cli/crud.py
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
@command()
@click.option(
    "-n",
    "--name",
    help="Name of the workstation to stop.",
    type=str,
    metavar="<str>",
)
@click.pass_context
def stop(context: click.Context, **kwargs):
    """Stop workstation."""
    # Make sure the user is authenticated
    check_gcloud_auth()

    workstation_details = config_manager.read_configuration(kwargs["name"])
    response = stop_workstation(**workstation_details)
    console.print(response.name, response.state)

sync(context, name, **kwargs)

Sync files to workstation.

Source code in src/workstation/cli/crud.py
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
@command()
@click.option(
    "-n",
    "--name",
    help="Name of the workstation to sync.",
    type=str,
    metavar="<str>",
)
@click.pass_context
def sync(
    context: click.Context,
    name: str,
    **kwargs,
):
    """Sync files to workstation."""
    # Make sure the user is authenticated
    check_gcloud_auth()

    # TDOO: Add source and destination options
    source = "~/remote-machines/workstation/"
    destination = "~/"

    workstation_details = config_manager.read_configuration(name)

    result = sync_files_workstation(
        source=source,
        destination=destination,
        **workstation_details,
    )

    for line in result.stdout.split("\n"):
        console.print(line)
    if result.returncode != 0:
        console.print(result.args)
        console.print(result.stderr)

config

ConfigManager

A class to manage Workstation configurations.

Attributes:

Name Type Description
workstation_data_dir Path

The directory where workstation data is stored.

workstation_configs Path

The directory where individual workstation configurations are stored.

Methods:

Name Description
check_if_config_exists

Checks if a configuration file with the given name exists.

write_configuration

Writes the configuration to a YAML file and returns the path to it.

read_configuration

Reads the configuration for the given name and returns it as a dictionary.

delete_configuration

Deletes the configuration file and its corresponding YAML file for the given name.

write_ssh_config

Writes the SSH configuration for the workstation.

Source code in src/workstation/config.py
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
class ConfigManager:
    """A class to manage Workstation configurations.

    Attributes
    ----------
    workstation_data_dir : Path
        The directory where workstation data is stored.
    workstation_configs : Path
        The directory where individual workstation configurations are stored.

    Methods
    -------
    check_if_config_exists(name: str) -> bool
        Checks if a configuration file with the given name exists.
    write_configuration(project: str, name: str, location: str, cluster: str, config: str) -> Path
        Writes the configuration to a YAML file and returns the path to it.
    read_configuration(name: str) -> dict
        Reads the configuration for the given name and returns it as a dictionary.
    delete_configuration(name: str) -> None
        Deletes the configuration file and its corresponding YAML file for the given name.
    write_ssh_config(name: str, user: str, project: str, cluster: str, config: str, region: str)
        Writes the SSH configuration for the workstation.
    """

    def __init__(self):
        self.workstation_data_dir = Path.home() / ".workstations"
        self.workstation_configs = self.workstation_data_dir / "configs"

    def check_if_config_exists(self, name: str) -> bool:
        """Check if a configuration file with the given name exists.

        Parameters
        ----------
        name : str
            The name of the configuration to check.

        Returns
        -------
        bool
            True if the configuration exists, False otherwise.
        """
        return (self.workstation_configs / (name + ".yml")).exists()

    def write_configuration(
        self, project: str, name: str, location: str, cluster: str, config: str
    ) -> Path:
        """Write the configuration to a YAML file.

        Parameters
        ----------
        project : str
            The project name.
        name : str
            The name of the workstation.
        location : str
            The location of the workstation.
        cluster : str
            The cluster associated with the workstation.
        config : str
            The specific configuration settings.

        Returns
        -------
        Path
            The path to the written YAML file.

        Raises
        ------
        Exception
            If any error occurs during the writing process.
        """
        self.workstation_configs.mkdir(parents=True, exist_ok=True)

        current_dir = Path.cwd()
        os.chdir(self.workstation_configs)
        try:
            workstation = WorkstationConfig(
                project=project,
                name=name,
                location=location,
                cluster=cluster,
                config=config,
            )

            workstation_path = workstation.generate_workstation_yml()
            return self.workstation_configs / workstation_path
        except Exception as e:
            os.chdir(current_dir)
            raise e

    def read_configuration(self, name: str) -> dict:
        """Read the configuration for the given name.

        Parameters
        ----------
        name : str
            The name of the configuration to read.

        Returns
        -------
        dict
            The contents of the configuration file as a dictionary.

        Raises
        ------
        FileNotFoundError
            If the configuration file does not exist.
        KeyError
            If required keys are missing from the configuration file.
        """
        workstation_config = self.workstation_configs / (name + ".yml")

        if not workstation_config.exists():
            raise FileNotFoundError(
                f"Configuration {name} not found, please check if {workstation_config} exists."
            )

        with open(workstation_config, "r") as file:
            contents = yaml.safe_load(file)

        # check that project, name, location, cluster, and config are in the file
        # For the error say what keys are missing
        if not all(
            key in contents
            for key in ["project", "name", "location", "cluster", "config"]
        ):
            missing_keys = [
                key
                for key in ["project", "name", "location", "cluster", "config"]
                if key not in contents
            ]
            raise KeyError(f"Configuration file {name} is missing keys {missing_keys}")

        return contents

    def delete_configuration(self, name: str) -> None:
        """Delete the configuration file and its corresponding YAML file.

        Parameters
        ----------
        name : str
            The name of the configuration to delete.

        Raises
        ------
        FileNotFoundError
            If the configuration file does not exist.
        """
        workstation_yml = self.workstation_configs / (name + ".yml")
        workstation_config = self.workstation_configs / (name + ".config")

        if not workstation_config.exists():
            raise FileNotFoundError(f"Configuration {name} not found")
        if not workstation_yml.exists():
            raise FileNotFoundError(f"Configuration {name} not found")

        workstation_config.unlink()
        workstation_yml.unlink()

    def write_ssh_config(
        self,
        name: str,
        user: str,
        project: str,
        cluster: str,
        config: str,
        region: str,
    ):
        """Write the SSH configuration for the workstation.

        Parameters
        ----------
        name : str
            The name of the workstation.
        user : str
            The user for SSH connection.
        project : str
            The project name.
        cluster : str
            The cluster associated with the workstation.
        config : str
            The specific configuration settings.
        region : str
            The region where the workstation is deployed.

        Raises
        ------
        NoPortFree
            If no free port is found after checking 20 ports.
        """
        workstation_config = self.workstation_configs / (name + ".config")

        # get all of the ports that are currently in use from the config files
        ports = []
        for config_file in self.workstation_configs.glob("*.config"):
            with open(config_file, "r") as file:
                contents = file.read()
                # Check if the match is not None before calling group
                match = re.search(r"\n\s*Port\s+(\d+)", contents)
                if match is not None:
                    port = int(match.group(1))
                    ports.append(port)

        if len(ports) == 0:
            port = 6000
        else:
            port = max(ports) + 1

        for _ in range(20):
            if check_socket("localhost", port):
                break
            port += 1
        else:
            raise NoPortFree("Could not find a free port after checking 20 ports.")

        proxy_command = (
            "sh -c '"
            "cleanup() { pkill -P $$; }; "
            "trap cleanup EXIT; "
            "gcloud workstations start-tcp-tunnel "
            f"--project={project} "
            f"--cluster={cluster} "
            f"--config={config} "
            f"--region={region} "
            "--local-host-port=localhost:%p %h 22 & "
            "timeout=10; "
            "while ! nc -z localhost %p; do "
            "sleep 1; "
            "timeout=$((timeout - 1)); "
            "if [ $timeout -le 0 ]; then "
            "exit 1; "
            "fi; "
            "done; "
            "nc localhost %p'"
        )

        config_content = dedent(
            f"""
            Host {name}
                HostName {name}
                Port {port}
                User {user}
                StrictHostKeyChecking no
                UserKnownHostsFile /dev/null
                ControlMaster auto
                ControlPersist 30m
                ControlPath ~/.ssh/cm/%r@%h:%p
                ProxyCommand {proxy_command}
                """
        ).strip()

        with open(workstation_config, "w") as file:
            file.write(config_content)

check_if_config_exists(name)

Check if a configuration file with the given name exists.

Parameters:

Name Type Description Default
name str

The name of the configuration to check.

required

Returns:

Type Description
bool

True if the configuration exists, False otherwise.

Source code in src/workstation/config.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def check_if_config_exists(self, name: str) -> bool:
    """Check if a configuration file with the given name exists.

    Parameters
    ----------
    name : str
        The name of the configuration to check.

    Returns
    -------
    bool
        True if the configuration exists, False otherwise.
    """
    return (self.workstation_configs / (name + ".yml")).exists()

delete_configuration(name)

Delete the configuration file and its corresponding YAML file.

Parameters:

Name Type Description Default
name str

The name of the configuration to delete.

required

Raises:

Type Description
FileNotFoundError

If the configuration file does not exist.

Source code in src/workstation/config.py
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def delete_configuration(self, name: str) -> None:
    """Delete the configuration file and its corresponding YAML file.

    Parameters
    ----------
    name : str
        The name of the configuration to delete.

    Raises
    ------
    FileNotFoundError
        If the configuration file does not exist.
    """
    workstation_yml = self.workstation_configs / (name + ".yml")
    workstation_config = self.workstation_configs / (name + ".config")

    if not workstation_config.exists():
        raise FileNotFoundError(f"Configuration {name} not found")
    if not workstation_yml.exists():
        raise FileNotFoundError(f"Configuration {name} not found")

    workstation_config.unlink()
    workstation_yml.unlink()

read_configuration(name)

Read the configuration for the given name.

Parameters:

Name Type Description Default
name str

The name of the configuration to read.

required

Returns:

Type Description
dict

The contents of the configuration file as a dictionary.

Raises:

Type Description
FileNotFoundError

If the configuration file does not exist.

KeyError

If required keys are missing from the configuration file.

Source code in src/workstation/config.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
def read_configuration(self, name: str) -> dict:
    """Read the configuration for the given name.

    Parameters
    ----------
    name : str
        The name of the configuration to read.

    Returns
    -------
    dict
        The contents of the configuration file as a dictionary.

    Raises
    ------
    FileNotFoundError
        If the configuration file does not exist.
    KeyError
        If required keys are missing from the configuration file.
    """
    workstation_config = self.workstation_configs / (name + ".yml")

    if not workstation_config.exists():
        raise FileNotFoundError(
            f"Configuration {name} not found, please check if {workstation_config} exists."
        )

    with open(workstation_config, "r") as file:
        contents = yaml.safe_load(file)

    # check that project, name, location, cluster, and config are in the file
    # For the error say what keys are missing
    if not all(
        key in contents
        for key in ["project", "name", "location", "cluster", "config"]
    ):
        missing_keys = [
            key
            for key in ["project", "name", "location", "cluster", "config"]
            if key not in contents
        ]
        raise KeyError(f"Configuration file {name} is missing keys {missing_keys}")

    return contents

write_configuration(project, name, location, cluster, config)

Write the configuration to a YAML file.

Parameters:

Name Type Description Default
project str

The project name.

required
name str

The name of the workstation.

required
location str

The location of the workstation.

required
cluster str

The cluster associated with the workstation.

required
config str

The specific configuration settings.

required

Returns:

Type Description
Path

The path to the written YAML file.

Raises:

Type Description
Exception

If any error occurs during the writing process.

Source code in src/workstation/config.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def write_configuration(
    self, project: str, name: str, location: str, cluster: str, config: str
) -> Path:
    """Write the configuration to a YAML file.

    Parameters
    ----------
    project : str
        The project name.
    name : str
        The name of the workstation.
    location : str
        The location of the workstation.
    cluster : str
        The cluster associated with the workstation.
    config : str
        The specific configuration settings.

    Returns
    -------
    Path
        The path to the written YAML file.

    Raises
    ------
    Exception
        If any error occurs during the writing process.
    """
    self.workstation_configs.mkdir(parents=True, exist_ok=True)

    current_dir = Path.cwd()
    os.chdir(self.workstation_configs)
    try:
        workstation = WorkstationConfig(
            project=project,
            name=name,
            location=location,
            cluster=cluster,
            config=config,
        )

        workstation_path = workstation.generate_workstation_yml()
        return self.workstation_configs / workstation_path
    except Exception as e:
        os.chdir(current_dir)
        raise e

write_ssh_config(name, user, project, cluster, config, region)

Write the SSH configuration for the workstation.

Parameters:

Name Type Description Default
name str

The name of the workstation.

required
user str

The user for SSH connection.

required
project str

The project name.

required
cluster str

The cluster associated with the workstation.

required
config str

The specific configuration settings.

required
region str

The region where the workstation is deployed.

required

Raises:

Type Description
NoPortFree

If no free port is found after checking 20 ports.

Source code in src/workstation/config.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
def write_ssh_config(
    self,
    name: str,
    user: str,
    project: str,
    cluster: str,
    config: str,
    region: str,
):
    """Write the SSH configuration for the workstation.

    Parameters
    ----------
    name : str
        The name of the workstation.
    user : str
        The user for SSH connection.
    project : str
        The project name.
    cluster : str
        The cluster associated with the workstation.
    config : str
        The specific configuration settings.
    region : str
        The region where the workstation is deployed.

    Raises
    ------
    NoPortFree
        If no free port is found after checking 20 ports.
    """
    workstation_config = self.workstation_configs / (name + ".config")

    # get all of the ports that are currently in use from the config files
    ports = []
    for config_file in self.workstation_configs.glob("*.config"):
        with open(config_file, "r") as file:
            contents = file.read()
            # Check if the match is not None before calling group
            match = re.search(r"\n\s*Port\s+(\d+)", contents)
            if match is not None:
                port = int(match.group(1))
                ports.append(port)

    if len(ports) == 0:
        port = 6000
    else:
        port = max(ports) + 1

    for _ in range(20):
        if check_socket("localhost", port):
            break
        port += 1
    else:
        raise NoPortFree("Could not find a free port after checking 20 ports.")

    proxy_command = (
        "sh -c '"
        "cleanup() { pkill -P $$; }; "
        "trap cleanup EXIT; "
        "gcloud workstations start-tcp-tunnel "
        f"--project={project} "
        f"--cluster={cluster} "
        f"--config={config} "
        f"--region={region} "
        "--local-host-port=localhost:%p %h 22 & "
        "timeout=10; "
        "while ! nc -z localhost %p; do "
        "sleep 1; "
        "timeout=$((timeout - 1)); "
        "if [ $timeout -le 0 ]; then "
        "exit 1; "
        "fi; "
        "done; "
        "nc localhost %p'"
    )

    config_content = dedent(
        f"""
        Host {name}
            HostName {name}
            Port {port}
            User {user}
            StrictHostKeyChecking no
            UserKnownHostsFile /dev/null
            ControlMaster auto
            ControlPersist 30m
            ControlPath ~/.ssh/cm/%r@%h:%p
            ProxyCommand {proxy_command}
            """
    ).strip()

    with open(workstation_config, "w") as file:
        file.write(config_content)

WorkstationConfig dataclass

A class to represent a Workstation’s configuration.

Attributes:

Name Type Description
name str

The name of the workstation.

location str

The location where the workstation is deployed.

cluster str

The cluster associated with the workstation.

config str

The specific configuration settings of the workstation.

project str

The project associated with the workstation.

Methods:

Name Description
generate_workstation_yml

Generates a YAML configuration file for the workstation and saves it to the current directory.

Source code in src/workstation/config.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@dataclass
class WorkstationConfig:
    """A class to represent a Workstation's configuration.

    Attributes
    ----------
    name : str
        The name of the workstation.
    location : str
        The location where the workstation is deployed.
    cluster : str
        The cluster associated with the workstation.
    config : str
        The specific configuration settings of the workstation.
    project : str
        The project associated with the workstation.

    Methods
    -------
    generate_workstation_yml() -> Path
        Generates a YAML configuration file for the workstation and saves it to the current directory.
    """

    name: str
    location: str
    cluster: str
    config: str
    project: str

    def generate_workstation_yml(self) -> Path:
        """Generate a YAML configuration file for the workstation.

        Returns
        -------
        Path
            The path to the generated YAML file.
        """
        write_path = Path(".", f"{self.name}.yml")
        with open(write_path, "w") as file:
            yaml.dump(asdict(self), file, sort_keys=False)

        return write_path

generate_workstation_yml()

Generate a YAML configuration file for the workstation.

Returns:

Type Description
Path

The path to the generated YAML file.

Source code in src/workstation/config.py
44
45
46
47
48
49
50
51
52
53
54
55
56
def generate_workstation_yml(self) -> Path:
    """Generate a YAML configuration file for the workstation.

    Returns
    -------
    Path
        The path to the generated YAML file.
    """
    write_path = Path(".", f"{self.name}.yml")
    with open(write_path, "w") as file:
        yaml.dump(asdict(self), file, sort_keys=False)

    return write_path

core

create_workstation(project, location, cluster, config, name, account, user, proxy=None, no_proxy=None, envs=None)

Create a new workstation with the specified configuration.

Parameters:

Name Type Description Default
project str

The Google Cloud project ID.

required
location str

The Google Cloud location.

required
cluster str

The workstation cluster name.

required
config str

The workstation configuration name.

required
name str

The name of the new workstation.

required
account str

The account associated with the workstation.

required
user str

The user associated with the workstation.

required
proxy Optional[str]

Proxy settings, by default None.

None
no_proxy Optional[str]

No-proxy settings, by default None.

None
envs Optional[Tuple[Tuple[str, str]]]

Additional environment variables to set, by default None.

None

Returns:

Type Description
Workstation

Response from the workstation creation request.

Source code in src/workstation/core.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
def create_workstation(
    project: str,
    location: str,
    cluster: str,
    config: str,
    name: str,
    account: str,
    user: str,
    proxy: Optional[str] = None,
    no_proxy: Optional[str] = None,
    envs: Optional[Tuple[Tuple[str, str]]] = None,
) -> Workstation:
    """
    Create a new workstation with the specified configuration.

    Parameters
    ----------
    project : str
        The Google Cloud project ID.
    location : str
        The Google Cloud location.
    cluster : str
        The workstation cluster name.
    config : str
        The workstation configuration name.
    name : str
        The name of the new workstation.
    account : str
        The account associated with the workstation.
    user : str
        The user associated with the workstation.
    proxy : Optional[str], optional
        Proxy settings, by default None.
    no_proxy : Optional[str], optional
        No-proxy settings, by default None.
    envs : Optional[Tuple[Tuple[str, str]]], optional
        Additional environment variables to set, by default None.

    Returns
    -------
    Workstation
        Response from the workstation creation request.
    """
    client = workstations_v1beta.WorkstationsClient()
    env = {
        "LDAP": user,
        "ACCOUNT": account,
    }

    if proxy:
        env["http_proxy"] = proxy
        env["HTTPS_PROXY"] = proxy
        env["https_proxy"] = proxy
        env["HTTP_PROXY"] = proxy
        env["no_proxy"] = no_proxy
        env["NO_PROXY"] = no_proxy

    if envs:
        user_envs = dict(envs)
        # ensure that no duplicate keys are added to env
        for key, value in user_envs.items():
            if key not in env:
                env[key] = value
            else:
                logger.warning(
                    f"Environment variable {key} already exists in the environment, skipping"
                )

    request = workstations_v1beta.CreateWorkstationRequest(
        parent=f"projects/{project}/locations/{location}/workstationClusters/{cluster}/workstationConfigs/{config}",
        workstation_id=name,
        workstation=Workstation(
            display_name=name,
            env=env,
        ),
    )

    try:
        operation = client.create_workstation(request=request)
        response = operation.result()
    except AlreadyExists:
        console.print(f"Workstation [bold blue]{name}[/bold blue] already exists")
        sys.exit(1)

    config_manager.write_configuration(
        project=project,
        name=name,
        location=location,
        cluster=cluster,
        config=config,
    )

    return response

delete_workstation(project, name, location, cluster, config)

Delete an existing workstation.

Parameters:

Name Type Description Default
project str

The Google Cloud project ID.

required
name str

The name of the workstation.

required
location str

The Google Cloud location.

required
cluster str

The workstation cluster name.

required
config str

The workstation configuration name.

required

Returns:

Type Description
Operation

Response from the workstation deletion request.

Source code in src/workstation/core.py
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
def delete_workstation(
    project: str,
    name: str,
    location: str,
    cluster: str,
    config: str,
) -> Operation:
    """
    Delete an existing workstation.

    Parameters
    ----------
    project : str
        The Google Cloud project ID.
    name : str
        The name of the workstation.
    location : str
        The Google Cloud location.
    cluster : str
        The workstation cluster name.
    config : str
        The workstation configuration name.

    Returns
    -------
    Operation
        Response from the workstation deletion request.
    """
    client = workstations_v1beta.WorkstationsClient()

    request = workstations_v1beta.DeleteWorkstationRequest(
        name=f"projects/{project}/locations/{location}/workstationClusters/{cluster}/workstationConfigs/{config}/workstations/{name}",
    )

    operation = client.delete_workstation(request=request)
    console.print("Waiting for operation to complete...")
    response = operation.result()

    return response

list_workstation_clusters(project, location)

List workstation clusters in a specific project and location.

Parameters:

Name Type Description Default
project str

The Google Cloud project ID.

required
location str

The Google Cloud location.

required

Returns:

Type Description
List[Dict]

A list of workstation cluster configurations.

Source code in src/workstation/core.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def list_workstation_clusters(project: str, location: str) -> List[Dict]:
    """
    List workstation clusters in a specific project and location.

    Parameters
    ----------
    project : str
        The Google Cloud project ID.
    location : str
        The Google Cloud location.

    Returns
    -------
    List[Dict]
        A list of workstation cluster configurations.
    """
    client = workstations_v1beta.WorkstationsClient()

    request = workstations_v1beta.ListWorkstationClustersRequest(
        parent=f"projects/{project}/locations/{location}",
    )
    page_result = client.list_workstation_clusters(request=request)

    configs = []
    for config in page_result:
        configs.append(
            {
                "name": config.name,
                "image": config.subnetwork,
            }
        )
    return configs

list_workstation_configs(project, location, cluster)

List usable workstation configurations in a specific project, location, and cluster.

Parameters:

Name Type Description Default
project str

The Google Cloud project ID.

required
location str

The Google Cloud location.

required
cluster str

The workstation cluster name.

required

Returns:

Type Description
List[Dict]

A list of usable workstation configurations.

Source code in src/workstation/core.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def list_workstation_configs(project: str, location: str, cluster: str) -> List[Dict]:
    """
    List usable workstation configurations in a specific project, location, and cluster.

    Parameters
    ----------
    project : str
        The Google Cloud project ID.
    location : str
        The Google Cloud location.
    cluster : str
        The workstation cluster name.

    Returns
    -------
    List[Dict]
        A list of usable workstation configurations.
    """
    client = workstations_v1beta.WorkstationsClient()

    request = workstations_v1beta.ListUsableWorkstationConfigsRequest(
        parent=f"projects/{project}/locations/{location}/workstationClusters/{cluster}",
    )
    page_result = client.list_usable_workstation_configs(request=request)

    configs = []
    for config in page_result:
        if config.host.gce_instance.machine_type not in machine_types:
            logger.debug(
                f"{config.host.gce_instance.machine_type} not exist in machine_types in machines.py"
            )
            continue
        machine_details = machine_types[config.host.gce_instance.machine_type]
        machine_specs = f"machine_specs[{machine_details['vCPUs']} vCPUs, {machine_details['Memory (GB)']} GB]"
        configs.append(
            {
                "name": config.name,
                "image": config.container.image,
                "machine_type": config.host.gce_instance.machine_type,
                "idle_timeout": config.idle_timeout.total_seconds(),
                "max_runtime": config.running_timeout.total_seconds(),
                "machine_specs": machine_specs,
            }
        )
    return configs

list_workstations(project, location, cluster)

List all workstations in a specific project, location, and cluster.

Parameters:

Name Type Description Default
project str

The Google Cloud project ID.

required
location str

The Google Cloud location.

required
cluster str

The workstation cluster name.

required

Returns:

Type Description
List[Dict]

A list of workstation configurations.

Source code in src/workstation/core.py
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
def list_workstations(project: str, location: str, cluster: str) -> List[Dict]:
    """
    List all workstations in a specific project, location, and cluster.

    Parameters
    ----------
    project : str
        The Google Cloud project ID.
    location : str
        The Google Cloud location.
    cluster : str
        The workstation cluster name.

    Returns
    -------
    List[Dict]
        A list of workstation configurations.
    """
    configs = list_workstation_configs(
        project=project, location=location, cluster=cluster
    )

    client = workstations_v1beta.WorkstationsClient()
    workstations = []

    for config in configs:
        request = workstations_v1beta.ListWorkstationsRequest(
            parent=config.get("name"),
        )

        page_result = client.list_workstations(request=request)

        for workstation in page_result:
            workstations.append(
                {
                    "name": workstation.name,
                    "state": workstation.state,
                    "env": workstation.env,
                    "config": config,
                    "project": project,
                    "location": location,
                    "cluster": cluster,
                }
            )
    return workstations

start_workstation(project, name, location, cluster, config)

Start an existing workstation.

Parameters:

Name Type Description Default
project str

The Google Cloud project ID.

required
name str

The name of the workstation.

required
location str

The Google Cloud location.

required
cluster str

The workstation cluster name.

required
config str

The workstation configuration name.

required

Returns:

Type Description
Operation

Response from the workstation start request.

Source code in src/workstation/core.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
def start_workstation(
    project: str,
    name: str,
    location: str,
    cluster: str,
    config: str,
) -> Operation:
    """
    Start an existing workstation.

    Parameters
    ----------
    project : str
        The Google Cloud project ID.
    name : str
        The name of the workstation.
    location : str
        The Google Cloud location.
    cluster : str
        The workstation cluster name.
    config : str
        The workstation configuration name.

    Returns
    -------
    Operation
        Response from the workstation start request.
    """
    client = workstations_v1beta.WorkstationsClient()

    request = workstations_v1beta.StartWorkstationRequest(
        name=f"projects/{project}/locations/{location}/workstationClusters/{cluster}/workstationConfigs/{config}/workstations/{name}",
    )

    operation = client.start_workstation(request=request)
    console.print("Waiting for operation to complete (~3 minutes)...")
    response = operation.result()

    return response

stop_workstation(project, name, location, cluster, config)

Stop an existing workstation.

Parameters:

Name Type Description Default
project str

The Google Cloud project ID.

required
name str

The name of the workstation.

required
location str

The Google Cloud location.

required
cluster str

The workstation cluster name.

required
config str

The workstation configuration name.

required

Returns:

Type Description
Operation

Response from the workstation stop request.

Source code in src/workstation/core.py
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
def stop_workstation(
    project: str,
    name: str,
    location: str,
    cluster: str,
    config: str,
) -> Operation:
    """
    Stop an existing workstation.

    Parameters
    ----------
    project : str
        The Google Cloud project ID.
    name : str
        The name of the workstation.
    location : str
        The Google Cloud location.
    cluster : str
        The workstation cluster name.
    config : str
        The workstation configuration name.

    Returns
    -------
    Operation
        Response from the workstation stop request.
    """
    client = workstations_v1beta.WorkstationsClient()

    request = workstations_v1beta.StopWorkstationRequest(
        name=f"projects/{project}/locations/{location}/workstationClusters/{cluster}/workstationConfigs/{config}/workstations/{name}",
    )

    operation = client.stop_workstation(request=request)
    console.print("Waiting for operation to complete...")
    response = operation.result()

    return response

utils

NoPortFree

Bases: Exception

Exception raised when no free port is available for the SSH tunnel.

Source code in src/workstation/utils.py
220
221
222
223
class NoPortFree(Exception):
    """Exception raised when no free port is available for the SSH tunnel."""

    pass

check_gcloud_auth()

Check if the current gcloud CLI is authenticated and refresh if necessary.

Returns:

Type Description
bool

True if authentication is successful, False otherwise.

Raises:

Type Description
SystemExit

If reauthentication is needed.

Source code in src/workstation/utils.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
def check_gcloud_auth():
    """
    Check if the current gcloud CLI is authenticated and refresh if necessary.

    Returns
    -------
    bool
        True if authentication is successful, False otherwise.

    Raises
    ------
    SystemExit
        If reauthentication is needed.
    """
    from google.auth import default

    try:
        credentials, project = google.auth.default()

        # Check if the credentials are valid and refresh if necessary
        if credentials.requires_scopes:
            credentials = credentials.with_scopes(
                ["https://www.googleapis.com/auth/cloud-platform"]
            )

        credentials.refresh(Request())
        return True

    except (DefaultCredentialsError, RefreshError):
        console.print(
            "Reauthentication is needed. Please run [bold blue]gcloud auth login & gcloud auth application-default login[/bold blue]."
        )
        sys.exit(1)

check_socket(host, port)

Check if a socket on the given host and port is available.

Parameters:

Name Type Description Default
host str

The hostname or IP address.

required
port int

The port number.

required

Returns:

Type Description
bool

True if the socket is available, False otherwise.

Source code in src/workstation/utils.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def check_socket(host, port):
    """
    Check if a socket on the given host and port is available.

    Parameters
    ----------
    host : str
        The hostname or IP address.
    port : int
        The port number.

    Returns
    -------
    bool
        True if the socket is available, False otherwise.
    """
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    try:
        s.bind((host, port))
        return True
    except socket.error:
        return False
    finally:
        s.close()

config_tree(configs)

Generate a tree structure for displaying workstation configurations using Rich library.

Parameters:

Name Type Description Default
configs list

A list of workstation configurations.

required

Returns:

Type Description
Tree

A Rich Tree object representing the configurations.

Source code in src/workstation/utils.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def config_tree(configs: list) -> Tree:
    """
    Generate a tree structure for displaying workstation configurations using Rich library.

    Parameters
    ----------
    configs : list
        A list of workstation configurations.

    Returns
    -------
    Tree
        A Rich Tree object representing the configurations.
    """
    tree = Tree("Configs", style="bold blue")

    for config in configs:
        config_branch = tree.add(f"Config: {config['name'].split('/')[-1]}")
        config_branch.add(f":minidisc: Image: {config['image']}")
        config_branch.add(f":computer: Machine Type: {config['machine_type']}")
        config_branch.add(f":computer: Machine Specs: {config['machine_specs']}")
        config_branch.add(
            f":hourglass_flowing_sand: Idle Timeout (s): {str(config['idle_timeout'])}"
        )
        config_branch.add(
            f":hourglass_flowing_sand: Max Runtime (s): {str(config['max_runtime'])}"
        )

    return tree

default_serializer(obj)

Handle specific object types that are not serializable by default.

Parameters:

Name Type Description Default
obj Any

The object to serialize.

required

Returns:

Type Description
Any

Serialized object (e.g., dictionary).

Raises:

Type Description
TypeError

If the object type is not serializable.

Source code in src/workstation/utils.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def default_serializer(obj):
    """
    Handle specific object types that are not serializable by default.

    Parameters
    ----------
    obj : Any
        The object to serialize.

    Returns
    -------
    Any
        Serialized object (e.g., dictionary).

    Raises
    ------
    TypeError
        If the object type is not serializable.
    """
    # Handle protobuf ScalarMapContainer
    if hasattr(obj, "MapContainer") or "google._upb._message.ScalarMapContainer" in str(
        type(obj)
    ):
        # Convert and filter out non-essential attributes
        return {
            key: value for key, value in obj.__dict__.items() if key != "MapContainer"
        }
    raise TypeError(f"Type {type(obj)} not serializable")

get_instance_assignment(project, name)

Get the instance assignment log entries for a specific workstation.

Parameters:

Name Type Description Default
project str

The Google Cloud project ID.

required
name str

The name of the workstation.

required

Returns:

Type Description
Dict

A dictionary of log entries related to the instance assignment.

Source code in src/workstation/utils.py
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
def get_instance_assignment(project: str, name: str):
    """
    Get the instance assignment log entries for a specific workstation.

    Parameters
    ----------
    project : str
        The Google Cloud project ID.
    name : str
        The name of the workstation.

    Returns
    -------
    Dict
        A dictionary of log entries related to the instance assignment.
    """
    check_gcloud_auth()
    client = cloud_logging.Client(project=project)

    timestamp = (datetime.now(timezone.utc) - timedelta(days=1)).isoformat()

    filter_str = (
        f'logName="projects/{project}/logs/workstations.googleapis.com%2Fvm_assignments" '
        f'AND timestamp >= "{timestamp}"'
    )

    entries = client.list_entries(filter_=filter_str)

    log_entries_dict = {}

    for entry in entries:
        try:
            workstation_id, log_entry = process_entry(entry, project)
            log_entries_dict[workstation_id] = log_entry
            if workstation_id == name:
                return log_entries_dict
        except Exception as exc:
            print(f"Entry {entry} generated an exception: {exc}")

    return log_entries_dict

get_logger()

Set log level from LOG_LEVEL environment variable, default to INFO.

This is useful for debugging purpose. The value of LOG_LEVEL should be one of these: ‘DEBUG’, ‘INFO’, ‘WARNING’, ‘ERROR’, ‘CRITICAL’.

Source code in src/workstation/utils.py
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
def get_logger():
    """
    Set log level from LOG_LEVEL environment variable, default to INFO.

    This is useful for debugging purpose.
    The value of LOG_LEVEL should be one of these: 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'.
    """
    log_level = os.getenv("LOG_LEVEL", "INFO").upper()
    logger = logging.getLogger(__name__)
    logger.setLevel(log_level)

    # Avoid adding multiple handlers
    if not logger.handlers:
        handler = logging.StreamHandler()  # Log to console
        handler.setLevel(log_level)
        formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
        handler.setFormatter(formatter)
        logger.addHandler(handler)

    return logger

process_entry(entry, project)

Process a log entry to extract workstation information.

Parameters:

Name Type Description Default
entry

A log entry object.

required
project str

The Google Cloud project ID.

required

Returns:

Type Description
Tuple[str, Dict]

Workstation ID and a dictionary with instance information.

Source code in src/workstation/utils.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
def process_entry(entry, project):
    """
    Process a log entry to extract workstation information.

    Parameters
    ----------
    entry
        A log entry object.
    project : str
        The Google Cloud project ID.

    Returns
    -------
    Tuple[str, Dict]
        Workstation ID and a dictionary with instance information.
    """
    workstation_id = entry.resource.labels.get("workstation_id")
    instance_name = entry.labels.get("instance_name")
    instance_id = entry.labels.get("instance_id")

    resource_type = "gce_instance"
    base_url = f"https://console.cloud.google.com/logs/query;query=resource.type%3D%22{resource_type}%22%0Aresource.labels.instance_id%3D%22"
    url = f"{base_url}{instance_id}%22?project={project}"

    log_entry = {
        "instance_name": instance_name,
        "instance_id": instance_id,
        "logs_url": url,
    }

    return workstation_id, log_entry

read_gcloud_config()

Read the default Google Cloud configuration.

Returns:

Type Description
Tuple[str, str, str]

Default project ID, location, and account from gcloud configuration.

Source code in src/workstation/utils.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def read_gcloud_config():
    """
    Read the default Google Cloud configuration.

    Returns
    -------
    Tuple[str, str, str]
        Default project ID, location, and account from gcloud configuration.
    """
    config_path = os.path.expanduser("~/.config/gcloud/configurations/config_default")
    config = configparser.ConfigParser()
    config.read(config_path)

    # Assuming the default settings are under the 'core' section
    default_project = config.get("core", "project", fallback=None)
    default_location = config.get("compute", "region", fallback=None)
    account = config.get("core", "account", fallback=None)

    return default_project, default_location, account

sync_files_workstation(project, name, location, cluster, config, source, destination)

Synchronize files from the local system to the workstation using rsync over an SSH tunnel.

Parameters:

Name Type Description Default
project str

The Google Cloud project ID.

required
name str

The name of the workstation.

required
location str

The Google Cloud location.

required
cluster str

The workstation cluster name.

required
config str

The workstation configuration name.

required
source str

The source directory on the local system.

required
destination str

The destination directory on the workstation.

required

Returns:

Type Description
CompletedProcess

The result of the rsync command.

Source code in src/workstation/utils.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def sync_files_workstation(
    project: str,
    name: str,
    location: str,
    cluster: str,
    config: str,
    source: str,
    destination: str,
):
    """
    Synchronize files from the local system to the workstation using rsync over an SSH tunnel.

    Parameters
    ----------
    project : str
        The Google Cloud project ID.
    name : str
        The name of the workstation.
    location : str
        The Google Cloud location.
    cluster : str
        The workstation cluster name.
    config : str
        The workstation configuration name.
    source : str
        The source directory on the local system.
    destination : str
        The destination directory on the workstation.

    Returns
    -------
    subprocess.CompletedProcess
        The result of the rsync command.
    """
    port = 61000
    for _ in range(20):
        if check_socket("localhost", port):
            break
        port += 1
    else:
        raise NoPortFree("Could not find a free port after checking 20 ports.")

    process = subprocess.Popen(
        [
            "gcloud",
            "workstations",
            "start-tcp-tunnel",
            f"--project={project}",
            f"--cluster={cluster}",
            f"--config={config}",
            f"--region={location}",
            f"--region={location}",
            f"{name}",
            "22",
            f"--local-host-port=:{port}",
        ],
        text=True,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
    )
    if process.poll() is not None:
        if process.returncode != 0:
            raise CalledProcessError(process.stderr.read())

    # use rsync to sync files from local to workstation
    source_path = os.path.expanduser(source)
    destination_path = f"localhost:{destination}"

    command = [
        "rsync",
        "-av",
        "--exclude=.venv",
        "--exclude=.git",
        "--exclude=.DS_Store",
        "-e",
        f"ssh -p {port} -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null",
        source_path,
        destination_path,
    ]
    counter = 0
    while check_socket("localhost", port):
        if counter >= 10:
            break
        time.sleep(1)
        counter += 1

    result = subprocess.run(command, capture_output=True, text=True)
    process.kill()
    return result