tyluigiutils package

Submodules

tyluigiutils.external module

class tyluigiutils.external.ExternalFileTask(*args, **kwargs)[source]

Bases: luigi.task.ExternalTask

Use this task to model a dependency on a file that is created outside of Luigi. This task has no run method, so if the expected file is not present, this task will fail, which is the only thing we can do in this case.

hdfs = <luigi.parameter.Parameter object>
output()[source]

The output that this Task produces.

The output of the Task determines if the Task needs to be run–the task is considered finished iff the outputs all exist. Subclasses should override this method to return a single Target or a list of Target instances.

Implementation note
If running multiple workers, the output must be a resource that is accessible by all workers, such as a DFS or database. Otherwise, workers might compute the same output since they don’t see the work done by other workers.

See Task.output

output_path = <luigi.parameter.Parameter object>

tyluigiutils.hadoop module

class tyluigiutils.hadoop.JsonInputJobTask(*args, **kwargs)[source]

Bases: luigi.contrib.hadoop.JobTask

Helper class for a Hadoop job that reads JSON input, optionally validated by a schema.

input_schema()[source]

Override and return a Python object representing a JSON schema. If any value in the mapper input does not validate against this schema, an error is logged, and the job fails.

mapper(key, value)[source]

Identity mapper

reader(input_stream)[source]

Reader is a method which iterates over input lines and outputs records.

The default implementation yields one argument containing the line for each line in the input.

class tyluigiutils.hadoop.JsonJobTask(*args, **kwargs)[source]

Bases: tyluigiutils.hadoop.JsonInputJobTask, tyluigiutils.hadoop.JsonOutputJobTask

Hadoop job which reads and writes JSON lines. Keys are plain text, but values are JSON-encoded. Fails if JSON decoding or encoding fails for any input or output line.

Optionally, input and/or output can be validated against a JSON schema. To achieve this, override the input_schema and/or output_schema methods.

class tyluigiutils.hadoop.JsonOutputJobTask(*args, **kwargs)[source]

Bases: luigi.contrib.hadoop.JobTask

Helper class for a Hadoop job that writes JSON input, optionally validated by a schema.

output_schema()[source]

Override and return a Python object representing a JSON schema. If any value output by the reducer does not validate against this schema, an error is logged, and the job fails.

writer(outputs, stdout, stderr=<open file '<stderr>', mode 'w'>)[source]

Writer format is a method which iterates over the output records from the reducer and formats them for output.

The default implementation outputs tab separated items.

class tyluigiutils.hadoop.TYDefaultHadoopJobRunner(**kwargs)[source]

Bases: luigi.contrib.hadoop.HadoopJobRunner

class tyluigiutils.hadoop.VenvJobTask(*args, **kwargs)[source]

Bases: luigi.contrib.hadoop.JobTask

Hadoop job which runs in a virtual environment.

It uses the configuration ‘venv_path’ in the ‘hadoop’ section of your luigi configuration file. If the venv_path is a zip file it will use it directly as archive. If it is a directory will compress it and ship it with the hadoop job. The venv can be also located directly on HDFS, use the hdfs:// url prefix to indicate it.

finish()[source]
job_runner()[source]

Get the MapReduce runner for this job.

If all outputs are HdfsTargets, the DefaultHadoopJobRunner will be used. Otherwise, the LocalJobRunner which streams all data through the local machine will be used (great for testing).

libjars = None
venv_path = <luigi.parameter.Parameter object>
venv_tmp = None

tyluigiutils.spark module

class tyluigiutils.spark.BaseTYPySparkTask(*args, **kwargs)[source]

Bases: luigi.contrib.spark.PySparkTask

atomic_output = <luigi.parameter.BoolParameter object>
main(sc, *args)[source]

Called by the pyspark_runner with a SparkContext and any arguments returned by app_options()

Parameters:
  • sc – SparkContext
  • args – arguments list
write_csv_output(*args, **kwargs)[source]

Writes csv output from a dataframe.

Parameters:
  • df – A dataframe.
  • sc – A SparkContext.
  • sep – Separator.
  • quote – sets the single character used for escaping quoted values where the separator can be part of the value. If None is set, it uses the default value, “. If you would like to turn off quotations, you need to set an empty string.
  • escape – sets the single character used for escaping quotes inside an already quoted value. If None is set, it uses the default value.
  • escapeQuotes – A flag indicating whether values containing quotes should always be enclosed in quotes. If None is set, it uses the default value true, escaping all values containing a quote character.
Returns:

None

write_jsonl_output(*args, **kwargs)[source]

Writes a json lines output from an RDD.

Parameters:
  • rdd – An RDD.
  • sc – A SparkContext.
Returns:

None

write_parquet_output(*args, **kwargs)[source]

Writes parquet output from a dataframe.

Parameters:
  • df – A dataframe.
  • sc – A SparkContext.
  • compression – Compression to be used.
  • mode – Write mode.
Returns:

None

class tyluigiutils.spark.SparkVenvJobTask(*args, **kwargs)[source]

Bases: luigi.contrib.spark.PySparkTask

archives
conf
program_environment()[source]

Override this method to control environment variables for the program

Returns:dict mapping environment variable names to values
set_pyspark_python_in_env

If set to true, the property PYSPARK_PYTHON will be set as part of the task environment. This is set to {venv_name}/bin/python, where the venv name is taken from venv_name() property. It also assume the venv directory is available on the working directory. True by default.

set_pyspark_python_spark_conf

If set to true, the property spark.yarn.appMasterEnv.PYSPARK_PYTHON will be also set. This requires that the PYSPARK_PYTHON is also available at the cluster nodes. Normally this is taken care of by the taks but you may want to double check unzipped files path. It is true by default.

To ensure this work properly the path {venv_name}/bin/python should be available in both the machine running the driver (in clinet mode) and also on the workers and application master (when running it in cluster mode). Generally not necessary if working in client mode.

Example: my venv is created on the myvenv directory on the current working directory (i.e. where the application is launched). Then it should be also packaged as myvenv.zip should contain:

$ unzip -l -qq myvenv.zip
...
437  2017-08-31 15:13   bin/python
...
venv_name
venv_path

Returns the path (hdfs:// or local) of the zipped venv to use

Implement this returning path. It can be taken from a configuration file or a Luigi Task parameter.

Returns:a path (string) of the zipped HDFS file
tyluigiutils.spark.normalize_local_filename(filename)[source]

Normalizes a local file full path to be used by python os/file io.

Parameters:filename – File full path.
Returns:Normalized filename.
tyluigiutils.spark.prepend_paths(*paths)[source]

Prepend paths before calling the function. This is useful when you are using a function that require a package and this function is run on one worker, the workers might not had the right PYTHON_PATH (although the packages might be found in the working directory).

tyluigiutils.spark.write_decorator(write_function)[source]

Decorator for Spark output writer.

This decorator writes first to a temporary files. if the writing is successful then it moves the temporary file to the final name. This is necessary as Spark might fail at the saveAs* stage however the final file would be created preventing Luigi to scheduling the task again after a failur.

Parameters:write_function – A writer function that saves the output.
Returns:The wrapper function.

tyluigiutils.sqoop module

Gather data using Sqoop table dumps run on RDBMS databases. Adapted from: https://github.com/edx/edx-analytics-pipeline/blob/master/edx/analytics/tasks/sqoop.py

class tyluigiutils.sqoop.SqoopImportFromPgSQL(*args, **kwargs)[source]

Bases: tyluigiutils.sqoop.SqoopImportTask

An abstract task that uses Sqoop to read data out of a database and writes it to a file in CSV or Avro format.

Known Issues: - Sqoop does not support very well the uuid format of Postgres so a cast to string needs to be done when dumping. Be aware of this when spliting (split_by) as this will be slow for large databases. For splitting is better to use a numeric index or use not splitting at all.

connection_url(cred)[source]

Construct connection URL from provided credentials.

import_args()[source]

Returns list of arguments specific to Sqoop import from a Postgress database.

schema = <luigi.parameter.Parameter object>
class tyluigiutils.sqoop.SqoopImportRunner[source]

Bases: luigi.contrib.hadoop.JobRunner

Runs a SqoopImportTask by shelling out to sqoop.

finish(job)[source]
run_job(job)[source]

Runs a SqoopImportTask by shelling out to sqoop.

class tyluigiutils.sqoop.SqoopImportTask(*args, **kwargs)[source]

Bases: luigi.contrib.hadoop.BaseHadoopJobTask

An abstract task that uses Sqoop to read data out of a database and writes it to a file in several format. In order to protect the database access credentials they are loaded from an external file which can be secured appropriately. By default it can be read from configuration (bad practice) but the _get_credentials method can be overridden to use a more secure approach

Known Issues: - Parquet file output won’t work on free query. So use it only when dumping a whole table. This will be solved in Sqoop 1.4.7

as_avro = <luigi.parameter.BoolParameter object>
as_parquet = <luigi.parameter.BoolParameter object>
atomic_output = <luigi.parameter.BoolParameter object>
columns = <luigi.parameter.Parameter object>
columns_map = <luigi.parameter.Parameter object>
connection_url(_cred)[source]

Construct connection URL from provided credentials.

database = <luigi.parameter.Parameter object>
delimiter_replacement = <luigi.parameter.Parameter object>
destination = <luigi.parameter.Parameter object>
fields_terminated_by = <luigi.parameter.Parameter object>
generic_args(password_target)[source]

Returns list of arguments used by all Sqoop commands, using credentials read from file.

get_arglist(password_file)[source]

Returns list of arguments for running Sqoop.

get_working_destination()[source]
host = <luigi.parameter.Parameter object>
import_args()[source]

Returns list of arguments specific to Sqoop import.

job_runner()[source]

Use simple runner that gets args from the job and passes through.

metadata_output()[source]

Return target to which metadata about the task execution can be written.

null_string = <luigi.parameter.Parameter object>
num_mappers = <luigi.parameter.Parameter object>
output()[source]

The output that this Task produces.

The output of the Task determines if the Task needs to be run–the task is considered finished iff the outputs all exist. Subclasses should override this method to return a single Target or a list of Target instances.

Implementation note
If running multiple workers, the output must be a resource that is accessible by all workers, such as a DFS or database. Otherwise, workers might compute the same output since they don’t see the work done by other workers.

See Task.output

password = <luigi.parameter.Parameter object>
port = <luigi.parameter.Parameter object>
query = <luigi.parameter.Parameter object>
split_by = <luigi.parameter.Parameter object>
table_name = <luigi.parameter.Parameter object>
username = <luigi.parameter.Parameter object>
verbose = <luigi.parameter.BoolParameter object>
where = <luigi.parameter.Parameter object>
working_output_path()[source]
class tyluigiutils.sqoop.SqoopPasswordTarget[source]

Bases: luigi.contrib.hdfs.target.HdfsTarget

Defines a temp file in HDFS to hold password.

tyluigiutils.sqoop.load_sqoop_cmd()[source]

Get path to sqoop command from Luigi configuration.

tyluigiutils.url module

Adapted from https://github.com/edx/edx-analytics-pipeline/blob/master/edx/analytics/tasks/url.py Support URLs. Specifically, we want to be able to refer to data stored in a variety of locations and formats using a standard URL syntax. Examples:: s3://some-bucket/path/to/file /path/to/local/file.gz hdfs://some/directory/ file://some/local/directory

class tyluigiutils.url.ExternalURL(*args, **kwargs)[source]

Bases: luigi.task.ExternalTask

Simple Task that returns a target based on its URL

output()[source]

The output that this Task produces.

The output of the Task determines if the Task needs to be run–the task is considered finished iff the outputs all exist. Subclasses should override this method to return a single Target or a list of Target instances.

Implementation note
If running multiple workers, the output must be a resource that is accessible by all workers, such as a DFS or database. Otherwise, workers might compute the same output since they don’t see the work done by other workers.

See Task.output

url = <luigi.parameter.Parameter object>
class tyluigiutils.url.IgnoredTarget[source]

Bases: luigi.contrib.hdfs.target.HdfsTarget

Dummy target for use in Hadoop jobs that produce no explicit output file.

exists()[source]

Returns True if the path for this FileSystemTarget exists; False otherwise.

This method is implemented by using fs.

open(mode='r')[source]

Open the FileSystem target.

This method returns a file-like object which can either be read from or written to depending on the specified mode.

Parameters:mode (str) – the mode r opens the FileSystemTarget in read-only mode, whereas w will open the FileSystemTarget in write mode. Subclasses can implement additional options.
class tyluigiutils.url.UncheckedExternalURL(*args, **kwargs)[source]

Bases: tyluigiutils.url.ExternalURL

A ExternalURL task that does not verify if the source file exists, which can be expensive for S3 URLs.

complete()[source]

If the task has any outputs, return True if all outputs exist. Otherwise, return False.

However, you may freely override this method with custom logic.

tyluigiutils.url.get_absulute_hdfs_dir()[source]
tyluigiutils.url.get_absulute_hdfs_dir_url()[source]
tyluigiutils.url.get_absulute_hdfs_dir_url_with_date(d)[source]
tyluigiutils.url.get_hdfs_dir_url()[source]
tyluigiutils.url.get_hdfs_target_from_rel_path(path, use_hdfs_dir=False)[source]

Transform paths into hdfs url using hdfs_dir

param: use_hdfs_dir : if True, it will use hdfs_dir configuration to build the path.

tyluigiutils.url.get_hdfs_target_with_date(filename)[source]

Returns the default Target formatted hdfs:///user/{username}/{hdfs_dir}/{date}/{filename}

tyluigiutils.url.get_hdfs_target_with_specific_date(filename, date)[source]

Returns the default Target formatted hdfs:///user/{username}/{hdfs_dir}/{date}/{filename}

tyluigiutils.url.get_hdfs_url_with_date(filename)[source]
tyluigiutils.url.get_hdfs_url_with_specific_date(filename, date)[source]
tyluigiutils.url.get_s3_bucket_url(bucket)[source]
tyluigiutils.url.get_s3_url_with_date(filename)[source]
tyluigiutils.url.get_target_class_from_url(url)[source]

Returns a luigi target class based on the url scheme

tyluigiutils.url.get_target_from_url(url)[source]

Returns a luigi target based on the url scheme

tyluigiutils.url.normalize_hdfs_url(hdfs_path)[source]

Normalizes an hdfs url to return an hdfs path

tyluigiutils.url.url_path_join(url, *extra_path)[source]

Extend the path component of the given URL. Relative paths extend the existing path, absolute paths replace it. Special path elements like ‘.’ and ‘..’ are not treated any differently than any other path element.

Examples:
url=http://foo.com/bar, extra_path=baz -> http://foo.com/bar/baz url=http://foo.com/bar, extra_path=/baz -> http://foo.com/baz url=http://foo.com/bar, extra_path=../baz -> http://foo.com/bar/../baz
Args:
url (str): The URL to modify. extra_path (str): The path to join with the current URL path.
Returns:
The URL with the path component joined with extra_path argument.

tyluigiutils.path module

tyluigiutils.path.generate_temporary_path_name(path, num=573278109)[source]

Returns a temporary path.

Parameters:
  • path – Original path.
  • num – random number
Returns:

Temporary path.

Module contents

Top-level package for TrustYou Luigi Utilities.